mirror of https://github.com/status-im/consul.git
Upgrade test: test peering upgrade from an old version of consul (#15768)
* upgrade test: test peering upgrade from an old version of consul NET-1809
This commit is contained in:
parent
9fcfe8d7c5
commit
692a6edd7d
|
@ -16,7 +16,7 @@ type Agent interface {
|
||||||
IsServer() bool
|
IsServer() bool
|
||||||
RegisterTermination(func() error)
|
RegisterTermination(func() error)
|
||||||
Terminate() error
|
Terminate() error
|
||||||
Upgrade(ctx context.Context, config Config, index int) error
|
Upgrade(ctx context.Context, config Config) error
|
||||||
Exec(ctx context.Context, cmd []string) (int, error)
|
Exec(ctx context.Context, cmd []string) (int, error)
|
||||||
DataDir() string
|
DataDir() string
|
||||||
}
|
}
|
||||||
|
|
|
@ -192,6 +192,11 @@ func (b *Builder) ToAgentConfig() (*Config, error) {
|
||||||
Version: *utils.TargetVersion,
|
Version: *utils.TargetVersion,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Override the default version
|
||||||
|
if b.context != nil && b.context.consulVersion != "" {
|
||||||
|
conf.Version = b.context.consulVersion
|
||||||
|
}
|
||||||
|
|
||||||
return conf, nil
|
return conf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,6 +39,7 @@ type consulContainerNode struct {
|
||||||
dataDir string
|
dataDir string
|
||||||
network string
|
network string
|
||||||
id int
|
id int
|
||||||
|
name string
|
||||||
terminateFuncs []func() error
|
terminateFuncs []func() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -124,12 +125,14 @@ func NewConsulContainer(ctx context.Context, config Config, network string, inde
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := consulContainer.StartLogProducer(ctx); err != nil {
|
if *utils.FollowLog {
|
||||||
return nil, err
|
if err := consulContainer.StartLogProducer(ctx); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
consulContainer.FollowOutput(&LogConsumer{
|
||||||
|
Prefix: name,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
consulContainer.FollowOutput(&LogConsumer{
|
|
||||||
Prefix: name,
|
|
||||||
})
|
|
||||||
|
|
||||||
uri, err := podContainer.Endpoint(ctx, "http")
|
uri, err := podContainer.Endpoint(ctx, "http")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -158,6 +161,7 @@ func NewConsulContainer(ctx context.Context, config Config, network string, inde
|
||||||
certDir: tmpCertData,
|
certDir: tmpCertData,
|
||||||
network: network,
|
network: network,
|
||||||
id: index,
|
id: index,
|
||||||
|
name: name,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -199,20 +203,12 @@ func (c *consulContainerNode) Exec(ctx context.Context, cmd []string) (int, erro
|
||||||
return c.container.Exec(ctx, cmd)
|
return c.container.Exec(ctx, cmd)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *consulContainerNode) Upgrade(ctx context.Context, config Config, index int) error {
|
// Upgrade terminates a running container and create a new one using the provided config.
|
||||||
pc, err := readSomeConfigFileFields(config.JSON)
|
// The upgraded node will
|
||||||
if err != nil {
|
// - use the same node name and the data dir as the old version node
|
||||||
return err
|
func (c *consulContainerNode) Upgrade(ctx context.Context, config Config) error {
|
||||||
}
|
// Reuse the node name since we assume upgrade on the same node
|
||||||
|
config.Cmd = append(config.Cmd, "-node", c.name)
|
||||||
consulType := "client"
|
|
||||||
if pc.Server {
|
|
||||||
consulType = "server"
|
|
||||||
}
|
|
||||||
name := utils.RandName(fmt.Sprintf("%s-consul-%s-%d", pc.Datacenter, consulType, index))
|
|
||||||
|
|
||||||
// Inject new Agent name
|
|
||||||
config.Cmd = append(config.Cmd, "-node", name)
|
|
||||||
|
|
||||||
file, err := createConfigFile(config.JSON)
|
file, err := createConfigFile(config.JSON)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -238,29 +234,38 @@ func (c *consulContainerNode) Upgrade(ctx context.Context, config Config, index
|
||||||
}
|
}
|
||||||
_, consulReq2 := newContainerRequest(config, opts)
|
_, consulReq2 := newContainerRequest(config, opts)
|
||||||
consulReq2.Env = c.consulReq.Env // copy license
|
consulReq2.Env = c.consulReq.Env // copy license
|
||||||
|
fmt.Printf("Upgraded node %s config:%s\n", c.name, file)
|
||||||
|
|
||||||
if c.container != nil {
|
if c.container != nil && *utils.FollowLog {
|
||||||
_ = c.container.StopLogProducer()
|
err = c.container.StopLogProducer()
|
||||||
if err := c.container.Terminate(c.ctx); err != nil {
|
time.Sleep(2 * time.Second)
|
||||||
return err
|
if err != nil {
|
||||||
|
fmt.Printf("WARN: error stop log producer: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = c.container.Terminate(c.ctx); err != nil {
|
||||||
|
return fmt.Errorf("error terminate running container: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
c.consulReq = consulReq2
|
c.consulReq = consulReq2
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
container, err := startContainer(ctx, c.consulReq)
|
container, err := startContainer(ctx, c.consulReq)
|
||||||
c.ctx = ctx
|
|
||||||
c.container = container
|
c.container = container
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
c.ctx = ctx
|
||||||
|
|
||||||
if err := container.StartLogProducer(ctx); err != nil {
|
if *utils.FollowLog {
|
||||||
return err
|
if err := container.StartLogProducer(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
container.FollowOutput(&LogConsumer{
|
||||||
|
Prefix: c.name,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
container.FollowOutput(&LogConsumer{
|
|
||||||
Prefix: name,
|
|
||||||
})
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -283,7 +288,7 @@ func (c *consulContainerNode) Terminate() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
state, err := c.container.State(context.Background())
|
state, err := c.container.State(context.Background())
|
||||||
if err == nil && state.Running {
|
if err == nil && state.Running && *utils.FollowLog {
|
||||||
// StopLogProducer can only be called on running containers
|
// StopLogProducer can only be called on running containers
|
||||||
err = c.container.StopLogProducer()
|
err = c.container.StopLogProducer()
|
||||||
if err1 := c.container.Terminate(c.ctx); err == nil {
|
if err1 := c.container.Terminate(c.ctx); err == nil {
|
||||||
|
|
|
@ -12,7 +12,7 @@ import (
|
||||||
// PeeringStatus verifies the peering connection is the specified state with a default retry.
|
// PeeringStatus verifies the peering connection is the specified state with a default retry.
|
||||||
func PeeringStatus(t *testing.T, client *api.Client, peerName string, status api.PeeringState) {
|
func PeeringStatus(t *testing.T, client *api.Client, peerName string, status api.PeeringState) {
|
||||||
failer := func() *retry.Timer {
|
failer := func() *retry.Timer {
|
||||||
return &retry.Timer{Timeout: 20 * time.Second, Wait: defaultWait}
|
return &retry.Timer{Timeout: 180 * time.Second, Wait: defaultWait}
|
||||||
}
|
}
|
||||||
|
|
||||||
retry.RunWith(failer(), t, func(r *retry.R) {
|
retry.RunWith(failer(), t, func(r *retry.R) {
|
||||||
|
|
|
@ -128,19 +128,22 @@ func (c *Cluster) Remove(n libagent.Agent) error {
|
||||||
// https://developer.hashicorp.com/consul/docs/upgrading#standard-upgrades
|
// https://developer.hashicorp.com/consul/docs/upgrading#standard-upgrades
|
||||||
//
|
//
|
||||||
// - takes a snapshot
|
// - takes a snapshot
|
||||||
// - terminate and rejoin the new version of consul
|
// - terminate and rejoin the pod of new version of consul
|
||||||
func (c *Cluster) StandardUpgrade(t *testing.T, ctx context.Context, targetVersion string) error {
|
func (c *Cluster) StandardUpgrade(t *testing.T, ctx context.Context, targetVersion string) error {
|
||||||
execCode, err := c.Agents[0].Exec(context.Background(), []string{"consul", "snapshot", "save", "backup.snap"})
|
retry.RunWith(&retry.Timer{Timeout: 30 * time.Second, Wait: 1 * time.Second}, t, func(r *retry.R) {
|
||||||
if execCode != 0 {
|
// NOTE: to suppress flakiness
|
||||||
return fmt.Errorf("error taking snapshot of the cluster, returned code %d", execCode)
|
execCode, err := c.Agents[0].Exec(context.Background(), []string{"consul", "snapshot", "save", "backup.snap"})
|
||||||
}
|
require.Equal(r, 0, execCode)
|
||||||
if err != nil {
|
require.NoError(r, err)
|
||||||
return err
|
})
|
||||||
}
|
|
||||||
|
|
||||||
// verify only the leader can take a snapshot
|
// verify only the leader can take a snapshot
|
||||||
snapshotCount := 0
|
snapshotCount := 0
|
||||||
for _, agent := range c.Agents {
|
for _, agent := range c.Agents {
|
||||||
|
if !agent.IsServer() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
files, err := ioutil.ReadDir(filepath.Join(agent.DataDir(), "raft", "snapshots"))
|
files, err := ioutil.ReadDir(filepath.Join(agent.DataDir(), "raft", "snapshots"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -149,31 +152,49 @@ func (c *Cluster) StandardUpgrade(t *testing.T, ctx context.Context, targetVersi
|
||||||
snapshotCount++
|
snapshotCount++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
require.Equalf(t, 1, snapshotCount, "only leader agent can have a snapshot file")
|
||||||
|
|
||||||
if snapshotCount != 1 {
|
// Upgrade individual agent to the target version in the following order
|
||||||
return fmt.Errorf("only leader agent can have a snapshot file, got %d", snapshotCount)
|
// 1. followers
|
||||||
}
|
// 2. leader
|
||||||
|
// 3. clients (TODO)
|
||||||
|
leader, err := c.Leader()
|
||||||
|
client := leader.GetClient()
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Log("Leader name:", leader.GetName())
|
||||||
|
|
||||||
|
followers, err := c.Followers()
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Log("The number of followers", len(followers))
|
||||||
|
|
||||||
|
for _, agent := range followers {
|
||||||
|
t.Log("Upgrade follower", agent.GetName())
|
||||||
|
|
||||||
// Upgrade individual agent to the target version
|
|
||||||
client := c.Agents[0].GetClient()
|
|
||||||
for _, agent := range c.Agents {
|
|
||||||
agent.Terminate()
|
|
||||||
if len(c.Agents) > 3 {
|
|
||||||
WaitForLeader(t, c, client)
|
|
||||||
} else {
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
}
|
|
||||||
config := agent.GetConfig()
|
config := agent.GetConfig()
|
||||||
config.Version = targetVersion
|
config.Version = targetVersion
|
||||||
err = agent.Upgrade(context.Background(), config, 1)
|
err = agent.Upgrade(context.Background(), config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait until the agent rejoin
|
|
||||||
WaitForLeader(t, c, client)
|
|
||||||
WaitForMembers(t, client, len(c.Agents))
|
WaitForMembers(t, client, len(c.Agents))
|
||||||
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(followers) > 0 {
|
||||||
|
client = followers[0].GetClient()
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Log("Upgrade leader:", leader.GetName())
|
||||||
|
config := leader.GetConfig()
|
||||||
|
config.Version = targetVersion
|
||||||
|
err = leader.Upgrade(context.Background(), config)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
WaitForMembers(t, client, len(c.Agents))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -269,6 +290,31 @@ func (c *Cluster) Clients() ([]libagent.Agent, error) {
|
||||||
return clients, nil
|
return clients, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PeerWithCluster establishes peering with the acceptor cluster
|
||||||
|
func (c *Cluster) PeerWithCluster(acceptingClient *api.Client, acceptingPeerName string, dialingPeerName string) error {
|
||||||
|
node := c.Agents[0]
|
||||||
|
dialingClient := node.GetClient()
|
||||||
|
|
||||||
|
generateReq := api.PeeringGenerateTokenRequest{
|
||||||
|
PeerName: acceptingPeerName,
|
||||||
|
}
|
||||||
|
generateRes, _, err := acceptingClient.Peerings().GenerateToken(context.Background(), generateReq, &api.WriteOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error generate token: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
establishReq := api.PeeringEstablishRequest{
|
||||||
|
PeerName: dialingPeerName,
|
||||||
|
PeeringToken: generateRes.PeeringToken,
|
||||||
|
}
|
||||||
|
_, _, err = dialingClient.Peerings().Establish(context.Background(), establishReq, &api.WriteOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error establish peering: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
const retryTimeout = 90 * time.Second
|
const retryTimeout = 90 * time.Second
|
||||||
const retryFrequency = 500 * time.Millisecond
|
const retryFrequency = 500 * time.Millisecond
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,145 @@
|
||||||
|
package cluster
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/api"
|
||||||
|
// "github.com/hashicorp/consul/sdk/testutil/retry"
|
||||||
|
libagent "github.com/hashicorp/consul/test/integration/consul-container/libs/agent"
|
||||||
|
libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert"
|
||||||
|
libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service"
|
||||||
|
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
// creatingAcceptingClusterAndSetup creates a cluster with 3 servers and 1 client.
|
||||||
|
// It also creates and registers a service+sidecar.
|
||||||
|
// The API client returned is pointed at the client agent.
|
||||||
|
func CreatingAcceptingClusterAndSetup(t *testing.T, numServer int, version string, acceptingPeerName string) (*Cluster, *api.Client, *libagent.BuildContext) {
|
||||||
|
var configs []libagent.Config
|
||||||
|
|
||||||
|
opts := libagent.BuildOptions{
|
||||||
|
InjectAutoEncryption: true,
|
||||||
|
InjectGossipEncryption: true,
|
||||||
|
ConsulVersion: version,
|
||||||
|
}
|
||||||
|
ctx, err := libagent.NewBuildContext(opts)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
for i := 0; i < numServer; i++ {
|
||||||
|
serverConf, err := libagent.NewConfigBuilder(ctx).
|
||||||
|
Bootstrap(numServer).
|
||||||
|
Peering(true).
|
||||||
|
RetryJoin(fmt.Sprintf("agent-%d", (i+1)%3)). // Round-robin join the servers
|
||||||
|
ToAgentConfig()
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Logf("dc1 server config %d: \n%s", i, serverConf.JSON)
|
||||||
|
|
||||||
|
configs = append(configs, *serverConf)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add a stable client to register the service
|
||||||
|
clientConf, err := libagent.NewConfigBuilder(ctx).
|
||||||
|
Client().
|
||||||
|
Peering(true).
|
||||||
|
RetryJoin("agent-0", "agent-1", "agent-2").
|
||||||
|
ToAgentConfig()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
t.Logf("dc1 client config: \n%s", clientConf.JSON)
|
||||||
|
|
||||||
|
configs = append(configs, *clientConf)
|
||||||
|
|
||||||
|
cluster, err := New(configs)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Use the client agent as the HTTP endpoint since we will not rotate it
|
||||||
|
clientNode := cluster.Agents[numServer]
|
||||||
|
client := clientNode.GetClient()
|
||||||
|
WaitForLeader(t, cluster, client)
|
||||||
|
WaitForMembers(t, client, numServer+1)
|
||||||
|
|
||||||
|
// Default Proxy Settings
|
||||||
|
ok, err := utils.ApplyDefaultProxySettings(client)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
// Create the mesh gateway for dataplane traffic
|
||||||
|
_, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Create a service and proxy instance
|
||||||
|
_, _, err = libservice.CreateAndRegisterStaticServerAndSidecar(clientNode)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
libassert.CatalogServiceExists(t, client, "static-server")
|
||||||
|
libassert.CatalogServiceExists(t, client, "static-server-sidecar-proxy")
|
||||||
|
|
||||||
|
// Export the service
|
||||||
|
config := &api.ExportedServicesConfigEntry{
|
||||||
|
Name: "default",
|
||||||
|
Services: []api.ExportedService{
|
||||||
|
{
|
||||||
|
Name: "static-server",
|
||||||
|
Consumers: []api.ServiceConsumer{
|
||||||
|
// TODO: need to handle the changed field name in 1.13
|
||||||
|
{Peer: acceptingPeerName},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ok, _, err = client.ConfigEntries().Set(config, &api.WriteOptions{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
return cluster, client, ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
// createDialingClusterAndSetup creates a cluster for peering with a single dev agent
|
||||||
|
func CreateDialingClusterAndSetup(t *testing.T, version string, dialingPeerName string) (*Cluster, *api.Client, libservice.Service) {
|
||||||
|
opts := libagent.BuildOptions{
|
||||||
|
Datacenter: "dc2",
|
||||||
|
InjectAutoEncryption: true,
|
||||||
|
InjectGossipEncryption: true,
|
||||||
|
ConsulVersion: version,
|
||||||
|
}
|
||||||
|
ctx, err := libagent.NewBuildContext(opts)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
conf, err := libagent.NewConfigBuilder(ctx).
|
||||||
|
Peering(true).
|
||||||
|
ToAgentConfig()
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Logf("dc2 server config: \n%s", conf.JSON)
|
||||||
|
|
||||||
|
configs := []libagent.Config{*conf}
|
||||||
|
|
||||||
|
cluster, err := New(configs)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
node := cluster.Agents[0]
|
||||||
|
client := node.GetClient()
|
||||||
|
WaitForLeader(t, cluster, client)
|
||||||
|
WaitForMembers(t, client, 1)
|
||||||
|
|
||||||
|
// Default Proxy Settings
|
||||||
|
ok, err := utils.ApplyDefaultProxySettings(client)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
// Create the mesh gateway for dataplane traffic
|
||||||
|
_, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", node)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Create a service and proxy instance
|
||||||
|
clientProxyService, err := libservice.CreateAndRegisterStaticClientSidecar(node, dialingPeerName, true)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
libassert.CatalogServiceExists(t, client, "static-client-sidecar-proxy")
|
||||||
|
|
||||||
|
return cluster, client, clientProxyService
|
||||||
|
}
|
|
@ -53,10 +53,14 @@ func (c ConnectContainer) Terminate() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.container.StopLogProducer()
|
var err error
|
||||||
|
if *utils.FollowLog {
|
||||||
if err1 := c.container.Terminate(c.ctx); err == nil {
|
err := c.container.StopLogProducer()
|
||||||
err = err1
|
if err1 := c.container.Terminate(c.ctx); err == nil {
|
||||||
|
err = err1
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = c.container.Terminate(c.ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.container = nil
|
c.container = nil
|
||||||
|
@ -122,12 +126,14 @@ func NewConnectService(ctx context.Context, name string, serviceName string, ser
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := container.StartLogProducer(ctx); err != nil {
|
if *utils.FollowLog {
|
||||||
return nil, err
|
if err := container.StartLogProducer(ctx); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
container.FollowOutput(&LogConsumer{
|
||||||
|
Prefix: containerName,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
container.FollowOutput(&LogConsumer{
|
|
||||||
Prefix: containerName,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Register the termination function the agent so the containers can stop together
|
// Register the termination function the agent so the containers can stop together
|
||||||
terminate := func() error {
|
terminate := func() error {
|
||||||
|
|
|
@ -49,10 +49,14 @@ func (c exampleContainer) Terminate() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.container.StopLogProducer()
|
var err error
|
||||||
|
if *utils.FollowLog {
|
||||||
if err1 := c.container.Terminate(c.ctx); err == nil {
|
err = c.container.StopLogProducer()
|
||||||
err = err1
|
if err1 := c.container.Terminate(c.ctx); err1 == nil {
|
||||||
|
err = err1
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = c.container.Terminate(c.ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.container = nil
|
c.container = nil
|
||||||
|
@ -97,12 +101,14 @@ func NewExampleService(ctx context.Context, name string, httpPort int, grpcPort
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := container.StartLogProducer(ctx); err != nil {
|
if *utils.FollowLog {
|
||||||
return nil, err
|
if err := container.StartLogProducer(ctx); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
container.FollowOutput(&LogConsumer{
|
||||||
|
Prefix: containerName,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
container.FollowOutput(&LogConsumer{
|
|
||||||
Prefix: containerName,
|
|
||||||
})
|
|
||||||
|
|
||||||
terminate := func() error {
|
terminate := func() error {
|
||||||
return container.Terminate(context.Background())
|
return container.Terminate(context.Background())
|
||||||
|
|
|
@ -47,10 +47,14 @@ func (c gatewayContainer) Terminate() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.container.StopLogProducer()
|
var err error
|
||||||
|
if *utils.FollowLog {
|
||||||
if err1 := c.container.Terminate(c.ctx); err == nil {
|
err = c.container.StopLogProducer()
|
||||||
err = err1
|
if err1 := c.container.Terminate(c.ctx); err == nil {
|
||||||
|
err = err1
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = c.container.Terminate(c.ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.container = nil
|
c.container = nil
|
||||||
|
@ -112,12 +116,14 @@ func NewGatewayService(ctx context.Context, name string, kind string, node libno
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := container.StartLogProducer(ctx); err != nil {
|
if *utils.FollowLog {
|
||||||
return nil, err
|
if err := container.StartLogProducer(ctx); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
container.FollowOutput(&LogConsumer{
|
||||||
|
Prefix: containerName,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
container.FollowOutput(&LogConsumer{
|
|
||||||
Prefix: containerName,
|
|
||||||
})
|
|
||||||
|
|
||||||
terminate := func() error {
|
terminate := func() error {
|
||||||
return container.Terminate(context.Background())
|
return container.Terminate(context.Background())
|
||||||
|
|
|
@ -12,3 +12,4 @@ var TargetImage = flag.String("target-image", "consul", "docker image name to be
|
||||||
var TargetVersion = flag.String("target-version", "local", "docker image version to be used as UUT (unit under test)")
|
var TargetVersion = flag.String("target-version", "local", "docker image version to be used as UUT (unit under test)")
|
||||||
var LatestImage = flag.String("latest-image", "consul", "docker image name to be used under test (Default: consul)")
|
var LatestImage = flag.String("latest-image", "consul", "docker image name to be used under test (Default: consul)")
|
||||||
var LatestVersion = flag.String("latest-version", "1.11", "docker image to be used as latest")
|
var LatestVersion = flag.String("latest-version", "1.11", "docker image to be used as latest")
|
||||||
|
var FollowLog = flag.Bool("follow-log", true, "follow container log in output (Default: true)")
|
||||||
|
|
|
@ -3,7 +3,6 @@ package peering
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/pem"
|
"encoding/pem"
|
||||||
"fmt"
|
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -60,7 +59,7 @@ func TestPeering_RotateServerAndCAThenFail_(t *testing.T) {
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
acceptingCluster, acceptingClient, acceptingCtx = creatingAcceptingClusterAndSetup(t)
|
acceptingCluster, acceptingClient, acceptingCtx = libcluster.CreatingAcceptingClusterAndSetup(t, 3, *utils.TargetVersion, acceptingPeerName)
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -69,7 +68,7 @@ func TestPeering_RotateServerAndCAThenFail_(t *testing.T) {
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
dialingCluster, dialingClient, clientSidecarService = createDialingClusterAndSetup(t)
|
dialingCluster, dialingClient, clientSidecarService = libcluster.CreateDialingClusterAndSetup(t, *utils.TargetVersion, dialingPeerName)
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -78,17 +77,7 @@ func TestPeering_RotateServerAndCAThenFail_(t *testing.T) {
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
generateReq := api.PeeringGenerateTokenRequest{
|
err := dialingCluster.PeerWithCluster(acceptingClient, acceptingPeerName, dialingPeerName)
|
||||||
PeerName: acceptingPeerName,
|
|
||||||
}
|
|
||||||
generateRes, _, err := acceptingClient.Peerings().GenerateToken(context.Background(), generateReq, &api.WriteOptions{})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
establishReq := api.PeeringEstablishRequest{
|
|
||||||
PeerName: dialingPeerName,
|
|
||||||
PeeringToken: generateRes.PeeringToken,
|
|
||||||
}
|
|
||||||
_, _, err = dialingClient.Peerings().Establish(context.Background(), establishReq, &api.WriteOptions{})
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
libassert.PeeringStatus(t, acceptingClient, acceptingPeerName, api.PeeringStateActive)
|
libassert.PeeringStatus(t, acceptingClient, acceptingPeerName, api.PeeringStateActive)
|
||||||
|
@ -199,132 +188,6 @@ func terminate(t *testing.T, cluster *libcluster.Cluster) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// creatingAcceptingClusterAndSetup creates a cluster with 3 servers and 1 client.
|
|
||||||
// It also creates and registers a service+sidecar.
|
|
||||||
// The API client returned is pointed at the client agent.
|
|
||||||
func creatingAcceptingClusterAndSetup(t *testing.T) (*libcluster.Cluster, *api.Client, *libagent.BuildContext) {
|
|
||||||
var configs []libagent.Config
|
|
||||||
|
|
||||||
opts := libagent.BuildOptions{
|
|
||||||
InjectAutoEncryption: true,
|
|
||||||
InjectGossipEncryption: true,
|
|
||||||
}
|
|
||||||
ctx, err := libagent.NewBuildContext(opts)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
numServer := 3
|
|
||||||
for i := 0; i < numServer; i++ {
|
|
||||||
serverConf, err := libagent.NewConfigBuilder(ctx).
|
|
||||||
Bootstrap(3).
|
|
||||||
Peering(true).
|
|
||||||
RetryJoin(fmt.Sprintf("agent-%d", (i+1)%3)). // Round-robin join the servers
|
|
||||||
ToAgentConfig()
|
|
||||||
require.NoError(t, err)
|
|
||||||
t.Logf("dc1 server config %d: \n%s", i, serverConf.JSON)
|
|
||||||
|
|
||||||
configs = append(configs, *serverConf)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add a stable client to register the service
|
|
||||||
clientConf, err := libagent.NewConfigBuilder(ctx).
|
|
||||||
Client().
|
|
||||||
Peering(true).
|
|
||||||
RetryJoin("agent-0", "agent-1", "agent-2").
|
|
||||||
ToAgentConfig()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
t.Logf("dc1 client config: \n%s", clientConf.JSON)
|
|
||||||
|
|
||||||
configs = append(configs, *clientConf)
|
|
||||||
|
|
||||||
cluster, err := libcluster.New(configs)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Use the client agent as the HTTP endpoint since we will not rotate it
|
|
||||||
clientNode := cluster.Agents[3]
|
|
||||||
client := clientNode.GetClient()
|
|
||||||
libcluster.WaitForLeader(t, cluster, client)
|
|
||||||
libcluster.WaitForMembers(t, client, 4)
|
|
||||||
|
|
||||||
// Default Proxy Settings
|
|
||||||
ok, err := utils.ApplyDefaultProxySettings(client)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.True(t, ok)
|
|
||||||
|
|
||||||
// Create the mesh gateway for dataplane traffic
|
|
||||||
_, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Create a service and proxy instance
|
|
||||||
_, _, err = libservice.CreateAndRegisterStaticServerAndSidecar(clientNode)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
libassert.CatalogServiceExists(t, client, "static-server")
|
|
||||||
libassert.CatalogServiceExists(t, client, "static-server-sidecar-proxy")
|
|
||||||
|
|
||||||
// Export the service
|
|
||||||
config := &api.ExportedServicesConfigEntry{
|
|
||||||
Name: "default",
|
|
||||||
Services: []api.ExportedService{
|
|
||||||
{
|
|
||||||
Name: "static-server",
|
|
||||||
Consumers: []api.ServiceConsumer{
|
|
||||||
{Peer: acceptingPeerName},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
ok, _, err = client.ConfigEntries().Set(config, &api.WriteOptions{})
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.True(t, ok)
|
|
||||||
|
|
||||||
return cluster, client, ctx
|
|
||||||
}
|
|
||||||
|
|
||||||
// createDialingClusterAndSetup creates a cluster for peering with a single dev agent
|
|
||||||
func createDialingClusterAndSetup(t *testing.T) (*libcluster.Cluster, *api.Client, libservice.Service) {
|
|
||||||
opts := libagent.BuildOptions{
|
|
||||||
Datacenter: "dc2",
|
|
||||||
InjectAutoEncryption: true,
|
|
||||||
InjectGossipEncryption: true,
|
|
||||||
}
|
|
||||||
ctx, err := libagent.NewBuildContext(opts)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
conf, err := libagent.NewConfigBuilder(ctx).
|
|
||||||
Peering(true).
|
|
||||||
ToAgentConfig()
|
|
||||||
require.NoError(t, err)
|
|
||||||
t.Logf("dc2 server config: \n%s", conf.JSON)
|
|
||||||
|
|
||||||
configs := []libagent.Config{*conf}
|
|
||||||
|
|
||||||
cluster, err := libcluster.New(configs)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
node := cluster.Agents[0]
|
|
||||||
client := node.GetClient()
|
|
||||||
libcluster.WaitForLeader(t, cluster, client)
|
|
||||||
libcluster.WaitForMembers(t, client, 1)
|
|
||||||
|
|
||||||
// Default Proxy Settings
|
|
||||||
ok, err := utils.ApplyDefaultProxySettings(client)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.True(t, ok)
|
|
||||||
|
|
||||||
// Create the mesh gateway for dataplane traffic
|
|
||||||
_, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", node)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Create a service and proxy instance
|
|
||||||
clientProxyService, err := libservice.CreateAndRegisterStaticClientSidecar(node, dialingPeerName, true)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
libassert.CatalogServiceExists(t, client, "static-client-sidecar-proxy")
|
|
||||||
|
|
||||||
return cluster, client, clientProxyService
|
|
||||||
}
|
|
||||||
|
|
||||||
// rotateServer add a new server agent to the cluster, then forces the prior agent to leave.
|
// rotateServer add a new server agent to the cluster, then forces the prior agent to leave.
|
||||||
func rotateServer(t *testing.T, cluster *libcluster.Cluster, client *api.Client, ctx *libagent.BuildContext, node libagent.Agent) {
|
func rotateServer(t *testing.T, cluster *libcluster.Cluster, client *api.Client, ctx *libagent.BuildContext, node libagent.Agent) {
|
||||||
conf, err := libagent.NewConfigBuilder(ctx).
|
conf, err := libagent.NewConfigBuilder(ctx).
|
||||||
|
|
|
@ -1,8 +1,11 @@
|
||||||
# Consul Upgrade Integration tests
|
# Consul Upgrade Integration tests
|
||||||
## Local run
|
## Local run
|
||||||
- run `make dev-docker`
|
- run `make dev-docker`
|
||||||
- run the tests.
|
- run the tests, e.g., `go test -run ^TestBasicConnectService$ ./test/basic -v`
|
||||||
|
|
||||||
To specify targets and latest image pass `target-version` and `latest-version` to the tests. By default, it uses the `consul` docker image with respectively `local` and `latest` tags.
|
To specify targets and latest image pass `target-version` and `latest-version` to the tests. By default, it uses the `consul` docker image with respectively `local` and `latest` tags.
|
||||||
|
|
||||||
To use dev consul image, pass `target-image` and `target-version`, `-target-image hashicorppreview/consul -target-version 1.14-dev`.
|
To use dev consul image, pass `target-image` and `target-version`, `-target-image hashicorppreview/consul -target-version 1.14-dev`.
|
||||||
|
|
||||||
|
By default, all container's logs are written to either `stdout`, or `stderr`; this makes it hard to debug, when the test case creates many
|
||||||
|
containers. To disable following container logs, run the test with `-follow-log false`.
|
||||||
|
|
|
@ -0,0 +1,93 @@
|
||||||
|
package upgrade
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/api"
|
||||||
|
libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert"
|
||||||
|
libcluster "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster"
|
||||||
|
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
acceptingPeerName = "accepting-to-dialer"
|
||||||
|
dialingPeerName = "dialing-to-acceptor"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestPeering_UpgradeToTarget_fromLatest checks peering status after dialing cluster
|
||||||
|
// and accepting cluster upgrade
|
||||||
|
func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) {
|
||||||
|
type testcase struct {
|
||||||
|
oldversion string
|
||||||
|
targetVersion string
|
||||||
|
}
|
||||||
|
tcs := []testcase{
|
||||||
|
// {
|
||||||
|
// TODO: API changed from 1.13 to 1.14 in , PeerName to Peer
|
||||||
|
// exportConfigEntry
|
||||||
|
// oldversion: "1.13",
|
||||||
|
// targetVersion: *utils.TargetVersion,
|
||||||
|
// },
|
||||||
|
{
|
||||||
|
oldversion: "1.14",
|
||||||
|
targetVersion: *utils.TargetVersion,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
run := func(t *testing.T, tc testcase) {
|
||||||
|
var acceptingCluster, dialingCluster *libcluster.Cluster
|
||||||
|
var acceptingClient, dialingClient *api.Client
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
acceptingCluster, acceptingClient, _ = libcluster.CreatingAcceptingClusterAndSetup(t, 3, tc.oldversion, acceptingPeerName)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
defer func() {
|
||||||
|
terminate(t, acceptingCluster)
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
dialingCluster, dialingClient, _ = libcluster.CreateDialingClusterAndSetup(t, tc.oldversion, dialingPeerName)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
defer func() {
|
||||||
|
terminate(t, dialingCluster)
|
||||||
|
}()
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
err := dialingCluster.PeerWithCluster(acceptingClient, acceptingPeerName, dialingPeerName)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
libassert.PeeringStatus(t, acceptingClient, acceptingPeerName, api.PeeringStateActive)
|
||||||
|
libassert.PeeringExports(t, acceptingClient, acceptingPeerName, 1)
|
||||||
|
|
||||||
|
// Upgrade the dialingCluster cluster and assert peering is still ACTIVE
|
||||||
|
err = dialingCluster.StandardUpgrade(t, context.Background(), tc.targetVersion)
|
||||||
|
require.NoError(t, err)
|
||||||
|
libassert.PeeringStatus(t, dialingClient, dialingPeerName, api.PeeringStateActive)
|
||||||
|
|
||||||
|
// Upgrade the accepting cluster and assert peering is still ACTIVE
|
||||||
|
err = acceptingCluster.StandardUpgrade(t, context.Background(), tc.targetVersion)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
libassert.PeeringStatus(t, acceptingClient, acceptingPeerName, api.PeeringStateActive)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tcs {
|
||||||
|
t.Run(fmt.Sprintf("upgrade from %s to %s", tc.oldversion, tc.targetVersion),
|
||||||
|
func(t *testing.T) {
|
||||||
|
run(t, tc)
|
||||||
|
})
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue