mirror of
https://github.com/status-im/consul.git
synced 2025-02-02 08:56:43 +00:00
b4151780d6
1. Upgraded agent can inherit the persisted token and join the cluster 2. Agent token prior to upgrade is still valid after upgrade 3. Enable ACL in the agent configuration
650 lines
17 KiB
Go
650 lines
17 KiB
Go
package cluster
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"time"
|
|
|
|
goretry "github.com/avast/retry-go"
|
|
dockercontainer "github.com/docker/docker/api/types/container"
|
|
"github.com/hashicorp/go-multierror"
|
|
"github.com/pkg/errors"
|
|
"github.com/testcontainers/testcontainers-go"
|
|
"github.com/testcontainers/testcontainers-go/wait"
|
|
|
|
"github.com/hashicorp/consul/api"
|
|
|
|
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
|
|
)
|
|
|
|
const bootLogLine = "Consul agent running"
|
|
const disableRYUKEnv = "TESTCONTAINERS_RYUK_DISABLED"
|
|
|
|
// Exposed ports info
|
|
const MaxEnvoyOnNode = 10 // the max number of Envoy sidecar can run along with the agent, base is 19000
|
|
const ServiceUpstreamLocalBindPort = 5000 // local bind Port of service's upstream
|
|
|
|
// consulContainerNode implements the Agent interface by running a Consul agent
|
|
// in a container.
|
|
type consulContainerNode struct {
|
|
ctx context.Context
|
|
pod testcontainers.Container
|
|
container testcontainers.Container
|
|
serverMode bool
|
|
datacenter string
|
|
config Config
|
|
podReq testcontainers.ContainerRequest
|
|
consulReq testcontainers.ContainerRequest
|
|
dataDir string
|
|
network string
|
|
id int
|
|
name string
|
|
terminateFuncs []func() error
|
|
|
|
client *api.Client
|
|
clientAddr string
|
|
clientCACertFile string
|
|
ip string
|
|
|
|
nextAdminPortOffset int
|
|
nextConnectPortOffset int
|
|
|
|
info AgentInfo
|
|
}
|
|
|
|
func (c *consulContainerNode) GetPod() testcontainers.Container {
|
|
return c.pod
|
|
}
|
|
|
|
func (c *consulContainerNode) ClaimAdminPort() (int, error) {
|
|
if c.nextAdminPortOffset >= MaxEnvoyOnNode {
|
|
return 0, fmt.Errorf("running out of envoy admin port, max %d, already claimed %d",
|
|
MaxEnvoyOnNode, c.nextAdminPortOffset)
|
|
}
|
|
p := 19000 + c.nextAdminPortOffset
|
|
c.nextAdminPortOffset++
|
|
return p, nil
|
|
}
|
|
|
|
// NewConsulContainer starts a Consul agent in a container with the given config.
|
|
func NewConsulContainer(ctx context.Context, config Config, cluster *Cluster) (Agent, error) {
|
|
network := cluster.NetworkName
|
|
index := cluster.Index
|
|
if config.ScratchDir == "" {
|
|
return nil, fmt.Errorf("ScratchDir is required")
|
|
}
|
|
|
|
license, err := readLicense()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pc, err := readSomeConfigFileFields(config.JSON)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
consulType := "client"
|
|
if pc.Server {
|
|
consulType = "server"
|
|
}
|
|
name := utils.RandName(fmt.Sprintf("%s-consul-%s-%d", pc.Datacenter, consulType, index))
|
|
|
|
// Inject new Agent name
|
|
config.Cmd = append(config.Cmd, "-node", name)
|
|
|
|
tmpDirData := filepath.Join(config.ScratchDir, "data")
|
|
if err := os.MkdirAll(tmpDirData, 0777); err != nil {
|
|
return nil, fmt.Errorf("error creating data directory %s: %w", tmpDirData, err)
|
|
}
|
|
if err := os.Chmod(tmpDirData, 0777); err != nil {
|
|
return nil, fmt.Errorf("error chowning data directory %s: %w", tmpDirData, err)
|
|
}
|
|
|
|
var caCertFileForAPI string
|
|
if config.CACert != "" {
|
|
caCertFileForAPI = filepath.Join(config.ScratchDir, "ca.pem")
|
|
if err := os.WriteFile(caCertFileForAPI, []byte(config.CACert), 0644); err != nil {
|
|
return nil, fmt.Errorf("error writing out CA cert %s: %w", caCertFileForAPI, err)
|
|
}
|
|
}
|
|
|
|
configFile, err := createConfigFile(config.ScratchDir, config.JSON)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error writing out config file %s: %w", configFile, err)
|
|
}
|
|
|
|
opts := containerOpts{
|
|
name: name,
|
|
configFile: configFile,
|
|
dataDir: tmpDirData,
|
|
license: license,
|
|
addtionalNetworks: []string{"bridge", network},
|
|
hostname: fmt.Sprintf("agent-%d", index),
|
|
}
|
|
podReq, consulReq := newContainerRequest(config, opts)
|
|
|
|
// Do some trickery to ensure that partial completion is correctly torn
|
|
// down, but successful execution is not.
|
|
var deferClean utils.ResettableDefer
|
|
defer deferClean.Execute()
|
|
|
|
podContainer, err := startContainer(ctx, podReq)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error starting pod with image %q: %w", podReq.Image, err)
|
|
}
|
|
deferClean.Add(func() {
|
|
_ = podContainer.Terminate(ctx)
|
|
})
|
|
|
|
var (
|
|
httpPort = pc.Ports.HTTP
|
|
httpsPort = pc.Ports.HTTPS
|
|
|
|
clientAddr string
|
|
clientCACertFile string
|
|
|
|
info AgentInfo
|
|
)
|
|
if httpPort > 0 {
|
|
uri, err := podContainer.PortEndpoint(ctx, "8500", "http")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
clientAddr = uri
|
|
|
|
} else if httpsPort > 0 {
|
|
uri, err := podContainer.PortEndpoint(ctx, "8501", "https")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
clientAddr = uri
|
|
|
|
clientCACertFile = caCertFileForAPI
|
|
|
|
} else {
|
|
if pc.Server {
|
|
return nil, fmt.Errorf("server container does not expose HTTP or HTTPS")
|
|
}
|
|
}
|
|
|
|
if caCertFileForAPI != "" {
|
|
if config.UseAPIWithTLS {
|
|
if pc.Ports.HTTPS > 0 {
|
|
info.UseTLSForAPI = true
|
|
} else {
|
|
return nil, fmt.Errorf("UseAPIWithTLS is set but ports.https is not for this agent")
|
|
}
|
|
}
|
|
if config.UseGRPCWithTLS {
|
|
if pc.Ports.GRPCTLS > 0 {
|
|
info.UseTLSForGRPC = true
|
|
} else {
|
|
return nil, fmt.Errorf("UseGRPCWithTLS is set but ports.grpc_tls is not for this agent")
|
|
}
|
|
}
|
|
info.CACertFile = clientCACertFile
|
|
}
|
|
|
|
ip, err := podContainer.ContainerIP(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
consulContainer, err := startContainer(ctx, consulReq)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error starting main with image %q: %w", consulReq.Image, err)
|
|
}
|
|
deferClean.Add(func() {
|
|
_ = consulContainer.Terminate(ctx)
|
|
})
|
|
|
|
if utils.FollowLog {
|
|
if err := consulContainer.StartLogProducer(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
deferClean.Add(func() {
|
|
_ = consulContainer.StopLogProducer()
|
|
})
|
|
|
|
if config.LogConsumer != nil {
|
|
consulContainer.FollowOutput(config.LogConsumer)
|
|
} else {
|
|
consulContainer.FollowOutput(&LogConsumer{
|
|
Prefix: opts.name,
|
|
})
|
|
}
|
|
}
|
|
|
|
node := &consulContainerNode{
|
|
config: config,
|
|
pod: podContainer,
|
|
container: consulContainer,
|
|
serverMode: pc.Server,
|
|
datacenter: pc.Datacenter,
|
|
ctx: ctx,
|
|
podReq: podReq,
|
|
consulReq: consulReq,
|
|
dataDir: tmpDirData,
|
|
network: network,
|
|
id: index,
|
|
name: name,
|
|
ip: ip,
|
|
info: info,
|
|
}
|
|
|
|
if httpPort > 0 || httpsPort > 0 {
|
|
apiConfig := api.DefaultConfig()
|
|
apiConfig.Address = clientAddr
|
|
if clientCACertFile != "" {
|
|
apiConfig.TLSConfig.CAFile = clientCACertFile
|
|
}
|
|
|
|
if cluster.TokenBootstrap != "" {
|
|
apiConfig.Token = cluster.TokenBootstrap
|
|
}
|
|
apiClient, err := api.NewClient(apiConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
node.client = apiClient
|
|
node.clientAddr = clientAddr
|
|
node.clientCACertFile = clientCACertFile
|
|
}
|
|
|
|
// Inject node token if ACL is enabled and the bootstrap token is generated
|
|
if cluster.TokenBootstrap != "" && cluster.ACLEnabled {
|
|
agentToken, err := cluster.CreateAgentToken(pc.Datacenter, name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cmd := []string{"consul", "acl", "set-agent-token",
|
|
"-token", cluster.TokenBootstrap,
|
|
"agent", agentToken}
|
|
|
|
// retry in case agent has not fully initialized
|
|
err = goretry.Do(
|
|
func() error {
|
|
_, err := node.Exec(context.Background(), cmd)
|
|
if err != nil {
|
|
return fmt.Errorf("error setting the agent token, error %s", err)
|
|
}
|
|
return nil
|
|
},
|
|
goretry.Delay(time.Second*1),
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error setting agent token: %s", err)
|
|
}
|
|
}
|
|
|
|
// disable cleanup functions now that we have an object with a Terminate() function
|
|
deferClean.Reset()
|
|
|
|
return node, nil
|
|
}
|
|
|
|
func (c *consulContainerNode) GetName() string {
|
|
if c.container == nil {
|
|
return c.consulReq.Name // TODO: is this safe to do all the time?
|
|
}
|
|
name, err := c.container.Name(c.ctx)
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
return name
|
|
}
|
|
|
|
func (c *consulContainerNode) GetAgentName() string {
|
|
return c.name
|
|
}
|
|
|
|
func (c *consulContainerNode) GetConfig() Config {
|
|
return c.config.Clone()
|
|
}
|
|
|
|
func (c *consulContainerNode) GetDatacenter() string {
|
|
return c.datacenter
|
|
}
|
|
|
|
func (c *consulContainerNode) IsServer() bool {
|
|
return c.serverMode
|
|
}
|
|
|
|
// GetClient returns an API client that can be used to communicate with the Agent.
|
|
func (c *consulContainerNode) GetClient() *api.Client {
|
|
return c.client
|
|
}
|
|
|
|
// NewClient returns an API client by making a new one based on the provided token
|
|
// - updateDefault: if true update the default client
|
|
func (c *consulContainerNode) NewClient(token string, updateDefault bool) (*api.Client, error) {
|
|
apiConfig := api.DefaultConfig()
|
|
apiConfig.Address = c.clientAddr
|
|
if c.clientCACertFile != "" {
|
|
apiConfig.TLSConfig.CAFile = c.clientCACertFile
|
|
}
|
|
|
|
if token != "" {
|
|
apiConfig.Token = token
|
|
}
|
|
apiClient, err := api.NewClient(apiConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if updateDefault {
|
|
c.client = apiClient
|
|
}
|
|
return apiClient, nil
|
|
}
|
|
|
|
func (c *consulContainerNode) GetAPIAddrInfo() (addr, caCert string) {
|
|
return c.clientAddr, c.clientCACertFile
|
|
}
|
|
|
|
func (c *consulContainerNode) GetInfo() AgentInfo {
|
|
return c.info
|
|
}
|
|
|
|
func (c *consulContainerNode) GetIP() string {
|
|
return c.ip
|
|
}
|
|
|
|
func (c *consulContainerNode) RegisterTermination(f func() error) {
|
|
c.terminateFuncs = append(c.terminateFuncs, f)
|
|
}
|
|
|
|
func (c *consulContainerNode) Exec(ctx context.Context, cmd []string) (string, error) {
|
|
exitcode, reader, err := c.container.Exec(ctx, cmd)
|
|
if exitcode != 0 {
|
|
return "", fmt.Errorf("exec with exit code %d", exitcode)
|
|
}
|
|
if err != nil {
|
|
return "", fmt.Errorf("exec with error %s", err)
|
|
}
|
|
|
|
buf, err := io.ReadAll(reader)
|
|
if err != nil {
|
|
return "", fmt.Errorf("error reading from exe output: %s", err)
|
|
}
|
|
|
|
return string(buf), err
|
|
}
|
|
|
|
func (c *consulContainerNode) Upgrade(ctx context.Context, config Config) error {
|
|
if config.ScratchDir == "" {
|
|
return fmt.Errorf("ScratchDir is required")
|
|
}
|
|
|
|
newConfigFile, err := createConfigFile(config.ScratchDir, config.JSON)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// We'll keep the same pod.
|
|
opts := containerOpts{
|
|
name: c.consulReq.Name,
|
|
configFile: newConfigFile,
|
|
dataDir: c.dataDir,
|
|
license: "",
|
|
addtionalNetworks: []string{"bridge", c.network},
|
|
hostname: c.consulReq.Hostname,
|
|
}
|
|
_, consulReq2 := newContainerRequest(config, opts)
|
|
consulReq2.Env = c.consulReq.Env // copy license
|
|
|
|
// sanity check two fields
|
|
if consulReq2.Name != c.consulReq.Name {
|
|
return fmt.Errorf("new name %q should match old name %q", consulReq2.Name, c.consulReq.Name)
|
|
}
|
|
if consulReq2.Hostname != c.consulReq.Hostname {
|
|
return fmt.Errorf("new hostname %q should match old hostname %q", consulReq2.Hostname, c.consulReq.Hostname)
|
|
}
|
|
|
|
if err := c.TerminateAndRetainPod(true); err != nil {
|
|
return fmt.Errorf("error terminating running container during upgrade: %w", err)
|
|
}
|
|
|
|
c.consulReq = consulReq2
|
|
|
|
container, err := startContainer(ctx, c.consulReq)
|
|
c.ctx = ctx
|
|
c.container = container
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if utils.FollowLog {
|
|
if err := container.StartLogProducer(ctx); err != nil {
|
|
return err
|
|
}
|
|
container.FollowOutput(&LogConsumer{
|
|
Prefix: opts.name,
|
|
})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Terminate attempts to terminate the agent container.
|
|
// This might also include running termination functions for containers associated with the agent.
|
|
// On failure, an error will be returned and the reaper process (RYUK) will handle cleanup.
|
|
func (c *consulContainerNode) Terminate() error {
|
|
return c.terminate(false, false)
|
|
}
|
|
func (c *consulContainerNode) TerminateAndRetainPod(skipFuncs bool) error {
|
|
return c.terminate(true, skipFuncs)
|
|
}
|
|
func (c *consulContainerNode) terminate(retainPod bool, skipFuncs bool) error {
|
|
// Services might register a termination function that should also fire
|
|
// when the "agent" is cleaned up.
|
|
// If skipFuncs is tru, We skip the terminateFuncs of connect sidecar, e.g.,
|
|
// during upgrade
|
|
if !skipFuncs {
|
|
for _, f := range c.terminateFuncs {
|
|
err := f()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
var merr error
|
|
if c.container != nil {
|
|
if err := TerminateContainer(c.ctx, c.container, true); err != nil {
|
|
merr = multierror.Append(merr, err)
|
|
}
|
|
c.container = nil
|
|
}
|
|
|
|
if !retainPod && c.pod != nil {
|
|
if err := TerminateContainer(c.ctx, c.pod, false); err != nil {
|
|
merr = multierror.Append(merr, err)
|
|
}
|
|
|
|
c.pod = nil
|
|
}
|
|
|
|
return merr
|
|
}
|
|
|
|
func (c *consulContainerNode) DataDir() string {
|
|
return c.dataDir
|
|
}
|
|
|
|
func startContainer(ctx context.Context, req testcontainers.ContainerRequest) (testcontainers.Container, error) {
|
|
ctx, cancel := context.WithTimeout(ctx, time.Second*40)
|
|
defer cancel()
|
|
return testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
|
|
ContainerRequest: req,
|
|
Started: true,
|
|
})
|
|
}
|
|
|
|
const pauseImage = "k8s.gcr.io/pause:3.3"
|
|
|
|
type containerOpts struct {
|
|
configFile string
|
|
dataDir string
|
|
hostname string
|
|
index int
|
|
license string
|
|
name string
|
|
addtionalNetworks []string
|
|
}
|
|
|
|
func newContainerRequest(config Config, opts containerOpts) (podRequest, consulRequest testcontainers.ContainerRequest) {
|
|
skipReaper := isRYUKDisabled()
|
|
|
|
pod := testcontainers.ContainerRequest{
|
|
Image: pauseImage,
|
|
AutoRemove: false,
|
|
Name: opts.name + "-pod",
|
|
SkipReaper: skipReaper,
|
|
ExposedPorts: []string{
|
|
"8500/tcp", // Consul HTTP API
|
|
"8501/tcp", // Consul HTTPs API
|
|
|
|
"8443/tcp", // Envoy Gateway Listener
|
|
|
|
"8079/tcp", // Envoy App Listener
|
|
"8080/tcp", // Envoy App Listener
|
|
"9998/tcp", // Envoy App Listener
|
|
"9999/tcp", // Envoy App Listener
|
|
},
|
|
Hostname: opts.hostname,
|
|
Networks: opts.addtionalNetworks,
|
|
}
|
|
|
|
// Envoy upstream listener
|
|
pod.ExposedPorts = append(pod.ExposedPorts, fmt.Sprintf("%d/tcp", ServiceUpstreamLocalBindPort))
|
|
|
|
// Reserve the exposed ports for Envoy admin port, e.g., 19000 - 19009
|
|
basePort := 19000
|
|
for i := 0; i < MaxEnvoyOnNode; i++ {
|
|
pod.ExposedPorts = append(pod.ExposedPorts, fmt.Sprintf("%d/tcp", basePort+i))
|
|
}
|
|
|
|
// For handshakes like auto-encrypt, it can take 10's of seconds for the agent to become "ready".
|
|
// If we only wait until the log stream starts, subsequent commands to agents will fail.
|
|
// TODO: optimize the wait strategy
|
|
app := testcontainers.ContainerRequest{
|
|
NetworkMode: dockercontainer.NetworkMode("container:" + opts.name + "-pod"),
|
|
Image: config.DockerImage(),
|
|
WaitingFor: wait.ForLog(bootLogLine).WithStartupTimeout(60 * time.Second), // See note above
|
|
AutoRemove: false,
|
|
Name: opts.name,
|
|
Mounts: []testcontainers.ContainerMount{
|
|
{
|
|
Source: testcontainers.DockerBindMountSource{HostPath: opts.configFile},
|
|
Target: "/consul/config/config.json",
|
|
ReadOnly: true,
|
|
},
|
|
{
|
|
Source: testcontainers.DockerBindMountSource{HostPath: opts.dataDir},
|
|
Target: "/consul/data",
|
|
},
|
|
},
|
|
Cmd: config.Cmd,
|
|
SkipReaper: skipReaper,
|
|
Env: map[string]string{"CONSUL_LICENSE": opts.license},
|
|
}
|
|
|
|
if config.CertVolume != "" {
|
|
app.Mounts = append(app.Mounts, testcontainers.ContainerMount{
|
|
Source: testcontainers.DockerVolumeMountSource{
|
|
Name: config.CertVolume,
|
|
},
|
|
Target: "/consul/config/certs",
|
|
ReadOnly: true,
|
|
})
|
|
}
|
|
|
|
// fmt.Printf("app: %s\n", utils.Dump(app))
|
|
|
|
return pod, app
|
|
}
|
|
|
|
// isRYUKDisabled returns whether the reaper process (RYUK) has been disabled
|
|
// by an environment variable.
|
|
//
|
|
// https://github.com/testcontainers/moby-ryuk
|
|
func isRYUKDisabled() bool {
|
|
skipReaperStr := os.Getenv(disableRYUKEnv)
|
|
skipReaper, err := strconv.ParseBool(skipReaperStr)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
return skipReaper
|
|
}
|
|
|
|
func readLicense() (string, error) {
|
|
if license := os.Getenv("CONSUL_LICENSE"); license != "" {
|
|
return license, nil
|
|
}
|
|
|
|
licensePath := os.Getenv("CONSUL_LICENSE_PATH")
|
|
if licensePath == "" {
|
|
return "", nil
|
|
}
|
|
|
|
licenseBytes, err := os.ReadFile(licensePath)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return string(licenseBytes), nil
|
|
}
|
|
|
|
func createConfigFile(scratchDir string, JSON string) (string, error) {
|
|
configDir := filepath.Join(scratchDir, "config")
|
|
|
|
if err := os.MkdirAll(configDir, 0777); err != nil {
|
|
return "", err
|
|
}
|
|
if err := os.Chmod(configDir, 0777); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
configFile := filepath.Join(configDir, "config.hcl")
|
|
|
|
if err := os.WriteFile(configFile, []byte(JSON), 0644); err != nil {
|
|
return "", err
|
|
}
|
|
return configFile, nil
|
|
}
|
|
|
|
type parsedConfig struct {
|
|
Datacenter string `json:"datacenter"`
|
|
Server bool `json:"server"`
|
|
Ports parsedPorts `json:"ports"`
|
|
}
|
|
|
|
type parsedPorts struct {
|
|
DNS int `json:"dns"`
|
|
HTTP int `json:"http"`
|
|
HTTPS int `json:"https"`
|
|
GRPC int `json:"grpc"`
|
|
GRPCTLS int `json:"grpc_tls"`
|
|
SerfLAN int `json:"serf_lan"`
|
|
SerfWAN int `json:"serf_wan"`
|
|
Server int `json:"server"`
|
|
}
|
|
|
|
func readSomeConfigFileFields(JSON string) (parsedConfig, error) {
|
|
var pc parsedConfig
|
|
if err := json.Unmarshal([]byte(JSON), &pc); err != nil {
|
|
return pc, errors.Wrap(err, "failed to parse config file")
|
|
}
|
|
if pc.Datacenter == "" {
|
|
pc.Datacenter = "dc1"
|
|
}
|
|
return pc, nil
|
|
}
|