Revert "discover: move instance discover code into separate package (#3144)" (#3180)

This reverts commit 26bfb2d00a.
This commit is contained in:
James Phillips 2017-06-23 01:38:55 -07:00 committed by GitHub
parent 6d354b4b9f
commit 42f60b04bb
415 changed files with 22779 additions and 8626 deletions

73
agent/config_aws.go Normal file
View File

@ -0,0 +1,73 @@
package agent
import (
"log"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/defaults"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
)
// discoverEc2Hosts searches an AWS region, returning a list of instance ips
// where EC2TagKey = EC2TagValue
func (c *Config) discoverEc2Hosts(logger *log.Logger) ([]string, error) {
config := c.RetryJoinEC2
ec2meta := ec2metadata.New(session.New())
if config.Region == "" {
logger.Printf("[INFO] agent: No EC2 region provided, querying instance metadata endpoint...")
identity, err := ec2meta.GetInstanceIdentityDocument()
if err != nil {
return nil, err
}
config.Region = identity.Region
}
awsConfig := &aws.Config{
Region: &config.Region,
Credentials: credentials.NewChainCredentials(
[]credentials.Provider{
&credentials.StaticProvider{
Value: credentials.Value{
AccessKeyID: config.AccessKeyID,
SecretAccessKey: config.SecretAccessKey,
},
},
&credentials.EnvProvider{},
&credentials.SharedCredentialsProvider{},
defaults.RemoteCredProvider(*(defaults.Config()), defaults.Handlers()),
}),
}
svc := ec2.New(session.New(), awsConfig)
resp, err := svc.DescribeInstances(&ec2.DescribeInstancesInput{
Filters: []*ec2.Filter{
{
Name: aws.String("tag:" + config.TagKey),
Values: []*string{
aws.String(config.TagValue),
},
},
},
})
if err != nil {
return nil, err
}
var servers []string
for i := range resp.Reservations {
for _, instance := range resp.Reservations[i].Instances {
// Terminated instances don't have the PrivateIpAddress field
if instance.PrivateIpAddress != nil {
servers = append(servers, *instance.PrivateIpAddress)
}
}
}
return servers, nil
}

59
agent/config_azure.go Normal file
View File

@ -0,0 +1,59 @@
package agent
import (
"fmt"
"log"
"github.com/Azure/azure-sdk-for-go/arm/network"
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/azure"
)
// discoverAzureHosts searches an Azure Subscription, returning a list of instance ips
// where AzureTag_Name = AzureTag_Value
func (c *Config) discoverAzureHosts(logger *log.Logger) ([]string, error) {
var servers []string
// Only works for the Azure PublicCLoud for now; no ability to test other Environment
oauthConfig, err := azure.PublicCloud.OAuthConfigForTenant(c.RetryJoinAzure.TenantID)
if err != nil {
return nil, err
}
// Get the ServicePrincipalToken for use searching the NetworkInterfaces
sbt, tokerr := azure.NewServicePrincipalToken(*oauthConfig,
c.RetryJoinAzure.ClientID,
c.RetryJoinAzure.SecretAccessKey,
azure.PublicCloud.ResourceManagerEndpoint,
)
if tokerr != nil {
return nil, tokerr
}
// Setup the client using autorest; followed the structure from Terraform
vmnet := network.NewInterfacesClient(c.RetryJoinAzure.SubscriptionID)
vmnet.Client.UserAgent = fmt.Sprint("Hashicorp-Consul")
vmnet.Authorizer = sbt
vmnet.Sender = autorest.CreateSender(autorest.WithLogging(logger))
// Get all Network interfaces across ResourceGroups unless there is a compelling reason to restrict
netres, neterr := vmnet.ListAll()
if neterr != nil {
return nil, neterr
}
// For now, ignore Primary interfaces, choose any PrivateIPAddress with the matching tags
for _, oneint := range *netres.Value {
// Make it a little more robust just in case there is actually no Tags
if oneint.Tags != nil {
if *(*oneint.Tags)[c.RetryJoinAzure.TagName] == c.RetryJoinAzure.TagValue {
// Make it a little more robust just in case IPConfigurations nil
if oneint.IPConfigurations != nil {
for _, onecfg := range *oneint.IPConfigurations {
// fmt.Println("Internal FQDN: ", *onecfg.Name, " IP: ", *onecfg.PrivateIPAddress)
// Only get the address if there is private IP address
if onecfg.PrivateIPAddress != nil {
servers = append(servers, *onecfg.PrivateIPAddress)
}
}
}
}
}
}
return servers, nil
}

View File

@ -0,0 +1,43 @@
package agent
import (
"log"
"os"
"testing"
)
func TestDiscoverAzureHosts(t *testing.T) {
subscriptionID := os.Getenv("ARM_SUBSCRIPTION_ID")
tenantID := os.Getenv("ARM_TENANT_ID")
clientID := os.Getenv("ARM_CLIENT_ID")
clientSecret := os.Getenv("ARM_CLIENT_SECRET")
environment := os.Getenv("ARM_ENVIRONMENT")
if subscriptionID == "" || clientID == "" || clientSecret == "" || tenantID == "" {
t.Skip("ARM_SUBSCRIPTION_ID, ARM_CLIENT_ID, ARM_CLIENT_SECRET and ARM_TENANT_ID " +
"must be set to test Discover Azure Hosts")
}
if environment == "" {
t.Log("Environments other than Public not supported at the moment")
}
c := &Config{
RetryJoinAzure: RetryJoinAzure{
SubscriptionID: subscriptionID,
ClientID: clientID,
SecretAccessKey: clientSecret,
TenantID: tenantID,
TagName: "type",
TagValue: "Foundation",
},
}
servers, err := c.discoverAzureHosts(log.New(os.Stderr, "", log.LstdFlags))
if err != nil {
t.Fatal(err)
}
if len(servers) != 3 {
t.Fatalf("bad: %v", servers)
}
}

38
agent/config_ec2_test.go Normal file
View File

@ -0,0 +1,38 @@
package agent
import (
"log"
"os"
"testing"
)
func TestDiscoverEC2Hosts(t *testing.T) {
t.Parallel()
if os.Getenv("AWS_REGION") == "" {
t.Skip("AWS_REGION not set, skipping")
}
if os.Getenv("AWS_ACCESS_KEY_ID") == "" {
t.Skip("AWS_ACCESS_KEY_ID not set, skipping")
}
if os.Getenv("AWS_SECRET_ACCESS_KEY") == "" {
t.Skip("AWS_SECRET_ACCESS_KEY not set, skipping")
}
c := &Config{
RetryJoinEC2: RetryJoinEC2{
Region: os.Getenv("AWS_REGION"),
TagKey: "ConsulRole",
TagValue: "Server",
},
}
servers, err := c.discoverEc2Hosts(&log.Logger{})
if err != nil {
t.Fatal(err)
}
if len(servers) != 3 {
t.Fatalf("bad: %v", servers)
}
}

159
agent/config_gce.go Normal file
View File

@ -0,0 +1,159 @@
package agent
import (
"context"
"fmt"
"io/ioutil"
"log"
"net/http"
"strings"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
compute "google.golang.org/api/compute/v1"
)
// discoverGCEHosts searches a Google Compute Engine region, returning a list
// of instance ips that match the tags given in GCETags.
func (c *Config) discoverGCEHosts(logger *log.Logger) ([]string, error) {
config := c.RetryJoinGCE
ctx := oauth2.NoContext
var client *http.Client
var err error
logger.Printf("[INFO] agent: Initializing GCE client")
if config.CredentialsFile != "" {
logger.Printf("[INFO] agent: Loading credentials from %s", config.CredentialsFile)
key, err := ioutil.ReadFile(config.CredentialsFile)
if err != nil {
return nil, err
}
jwtConfig, err := google.JWTConfigFromJSON(key, compute.ComputeScope)
if err != nil {
return nil, err
}
client = jwtConfig.Client(ctx)
} else {
logger.Printf("[INFO] agent: Using default credential chain")
client, err = google.DefaultClient(ctx, compute.ComputeScope)
if err != nil {
return nil, err
}
}
computeService, err := compute.New(client)
if err != nil {
return nil, err
}
if config.ProjectName == "" {
logger.Printf("[INFO] agent: No GCE project provided, will discover from metadata.")
config.ProjectName, err = gceProjectIDFromMetadata(logger)
if err != nil {
return nil, err
}
} else {
logger.Printf("[INFO] agent: Using pre-defined GCE project name: %s", config.ProjectName)
}
zones, err := gceDiscoverZones(ctx, logger, computeService, config.ProjectName, config.ZonePattern)
if err != nil {
return nil, err
}
logger.Printf("[INFO] agent: Discovering GCE hosts with tag %s in zones: %s", config.TagValue, strings.Join(zones, ", "))
var servers []string
for _, zone := range zones {
addresses, err := gceInstancesAddressesForZone(ctx, logger, computeService, config.ProjectName, zone, config.TagValue)
if err != nil {
return nil, err
}
if len(addresses) > 0 {
logger.Printf("[INFO] agent: Discovered %d instances in %s/%s: %v", len(addresses), config.ProjectName, zone, addresses)
}
servers = append(servers, addresses...)
}
return servers, nil
}
// gceProjectIDFromMetadata queries the metadata service on GCE to get the
// project ID (name) of an instance.
func gceProjectIDFromMetadata(logger *log.Logger) (string, error) {
logger.Printf("[INFO] agent: Attempting to discover GCE project from metadata.")
client := &http.Client{}
req, err := http.NewRequest("GET", "http://metadata.google.internal/computeMetadata/v1/project/project-id", nil)
if err != nil {
return "", err
}
req.Header.Add("Metadata-Flavor", "Google")
resp, err := client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
project, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
logger.Printf("[INFO] agent: GCE project discovered as: %s", project)
return string(project), nil
}
// gceDiscoverZones discovers a list of zones from a supplied zone pattern, or
// all of the zones available to a project.
func gceDiscoverZones(ctx context.Context, logger *log.Logger, computeService *compute.Service, project, pattern string) ([]string, error) {
var zones []string
if pattern != "" {
logger.Printf("[INFO] agent: Discovering zones for project %s matching pattern: %s", project, pattern)
} else {
logger.Printf("[INFO] agent: Discovering all zones available to project: %s", project)
}
call := computeService.Zones.List(project)
if pattern != "" {
call = call.Filter(fmt.Sprintf("name eq %s", pattern))
}
if err := call.Pages(ctx, func(page *compute.ZoneList) error {
for _, v := range page.Items {
zones = append(zones, v.Name)
}
return nil
}); err != nil {
return zones, err
}
logger.Printf("[INFO] agent: Discovered GCE zones: %s", strings.Join(zones, ", "))
return zones, nil
}
// gceInstancesAddressesForZone locates all instances within a specific project
// and zone, matching the supplied tag. Only the private IP addresses are
// returned, but ID is also logged.
func gceInstancesAddressesForZone(ctx context.Context, logger *log.Logger, computeService *compute.Service, project, zone, tag string) ([]string, error) {
var addresses []string
call := computeService.Instances.List(project, zone)
if err := call.Pages(ctx, func(page *compute.InstanceList) error {
for _, v := range page.Items {
for _, t := range v.Tags.Items {
if t == tag && len(v.NetworkInterfaces) > 0 && v.NetworkInterfaces[0].NetworkIP != "" {
addresses = append(addresses, v.NetworkInterfaces[0].NetworkIP)
}
}
}
return nil
}); err != nil {
return addresses, err
}
return addresses, nil
}

35
agent/config_gce_test.go Normal file
View File

@ -0,0 +1,35 @@
package agent
import (
"log"
"os"
"testing"
)
func TestDiscoverGCEHosts(t *testing.T) {
t.Parallel()
if os.Getenv("GCE_PROJECT") == "" {
t.Skip("GCE_PROJECT not set, skipping")
}
if os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") == "" && os.Getenv("GCE_CONFIG_CREDENTIALS") == "" {
t.Skip("GOOGLE_APPLICATION_CREDENTIALS or GCE_CONFIG_CREDENTIALS not set, skipping")
}
c := &Config{
RetryJoinGCE: RetryJoinGCE{
ProjectName: os.Getenv("GCE_PROJECT"),
ZonePattern: os.Getenv("GCE_ZONE"),
TagValue: "consulrole-server",
CredentialsFile: os.Getenv("GCE_CONFIG_CREDENTIALS"),
},
}
servers, err := c.discoverGCEHosts(log.New(os.Stderr, "", log.LstdFlags))
if err != nil {
t.Fatal(err)
}
if len(servers) != 3 {
t.Fatalf("bad: %v", servers)
}
}

View File

@ -2,73 +2,65 @@ package agent
import (
"fmt"
"net/url"
"time"
"github.com/hashicorp/go-discover"
)
// RetryJoin is used to handle retrying a join until it succeeds or all
// retries are exhausted.
func (a *Agent) retryJoin() {
cfg := a.config
awscfg := cfg.RetryJoinEC2
azurecfg := cfg.RetryJoinAzure
gcecfg := cfg.RetryJoinGCE
q := url.QueryEscape
ec2Enabled := cfg.RetryJoinEC2.TagKey != "" && cfg.RetryJoinEC2.TagValue != ""
gceEnabled := cfg.RetryJoinGCE.TagValue != ""
azureEnabled := cfg.RetryJoinAzure.TagName != "" && cfg.RetryJoinAzure.TagValue != ""
if len(cfg.RetryJoin) == 0 && !ec2Enabled && !gceEnabled && !azureEnabled {
return
}
a.logger.Printf("[INFO] agent: Joining cluster...")
attempts := cfg.RetryMaxAttempts
attempt := 0
for {
args := ""
switch {
case awscfg.TagKey != "" && awscfg.TagValue != "":
args = fmt.Sprintf("provider=aws region=%s tag_key=%s tag_value=%s access_key_id=%s secret_access_key=%s",
q(awscfg.Region), q(awscfg.TagKey), q(awscfg.TagValue), q(awscfg.AccessKeyID), q(awscfg.SecretAccessKey))
case gcecfg.TagValue != "":
args = fmt.Sprintf("provider=gce project_name=%s zone_pattern=%s tag_value=%s credentials_file=%s",
q(gcecfg.ProjectName), q(gcecfg.ZonePattern), q(gcecfg.TagValue), q(gcecfg.CredentialsFile))
case azurecfg.TagName != "" && azurecfg.TagValue != "":
args = fmt.Sprintf("provider=azure tenant_id=%s subscription_id=%s client_id=%s tag_name=%s tag_value=%s secret_access_key=%s",
q(azurecfg.TenantID), q(azurecfg.SubscriptionID), q(azurecfg.ClientID), q(azurecfg.TagName), q(azurecfg.TagValue), q(azurecfg.SecretAccessKey))
}
// do not retry join
if len(cfg.RetryJoin) == 0 && args == "" {
return
}
var n int
var err error
var servers []string
discovered, err := discover.Discover(args, a.logger)
if err != nil {
goto Retry
var err error
switch {
case ec2Enabled:
servers, err = cfg.discoverEc2Hosts(a.logger)
if err != nil {
a.logger.Printf("[ERR] agent: Unable to query EC2 instances: %s", err)
}
a.logger.Printf("[INFO] agent: Discovered %d servers from EC2", len(servers))
case gceEnabled:
servers, err = cfg.discoverGCEHosts(a.logger)
if err != nil {
a.logger.Printf("[ERR] agent: Unable to query GCE instances: %s", err)
}
a.logger.Printf("[INFO] agent: Discovered %d servers from GCE", len(servers))
case azureEnabled:
servers, err = cfg.discoverAzureHosts(a.logger)
if err != nil {
a.logger.Printf("[ERR] agent: Unable to query Azure instances: %s", err)
}
a.logger.Printf("[INFO] agent: Discovered %d servers from Azure", len(servers))
}
servers = discovered
servers = append(servers, cfg.RetryJoin...)
if len(servers) == 0 {
err = fmt.Errorf("No servers to join")
goto Retry
} else {
n, err := a.JoinLAN(servers)
if err == nil {
a.logger.Printf("[INFO] agent: Join completed. Synced with %d initial agents", n)
return
}
}
n, err = a.JoinLAN(servers)
if err == nil {
a.logger.Printf("[INFO] agent: Join completed. Synced with %d initial agents", n)
return
}
Retry:
attempts--
if attempts <= 0 {
attempt++
if cfg.RetryMaxAttempts > 0 && attempt > cfg.RetryMaxAttempts {
a.retryJoinCh <- fmt.Errorf("agent: max join retry exhausted, exiting")
return
}
a.logger.Printf("[WARN] agent: Join failed: %v, retrying in %v", err, cfg.RetryInterval)
time.Sleep(cfg.RetryInterval)
}

Some files were not shown because too many files have changed in this diff Show More