mirror of
https://github.com/status-im/consul.git
synced 2025-01-28 06:25:25 +00:00
Merge branch 'master' of https://github.com/alistanis/consul into use-http-package-statuses
This commit is contained in:
commit
5a07e89bb9
32
CHANGELOG.md
32
CHANGELOG.md
@ -2,6 +2,16 @@
|
||||
|
||||
IMPROVEMENTS:
|
||||
|
||||
* Consul agents will now periodically reconnect to available Consul servers
|
||||
in order to redistribute their RPC query load. Consul clients will, by
|
||||
default, attempt to establish a new connection every 120s to 180s, however
|
||||
the rate at which agents begin to query new servers is proportional to the
|
||||
size of the Consul cluster (servers should never receive more than 64 new
|
||||
connections per second per Consul server as a result of rebalancing).
|
||||
Clusters in stable environments who use `allow_stale` should see a more
|
||||
even distribution of query load across all of their Consul
|
||||
servers. [GH-1667]
|
||||
|
||||
BUG FIXES:
|
||||
|
||||
* Updated the internal web ui (`-ui` option) to latest released build, fixing
|
||||
@ -46,7 +56,7 @@ IMPROVEMENTS:
|
||||
messages and HTTP access logs [GH-1513] [GH-1448]
|
||||
* API clients configured for insecure SSL now use an HTTP transport that's
|
||||
set up the same way as the Go default transport [GH-1526]
|
||||
* Added new per-host telemery on DNS requests [GH-1537]
|
||||
* Added new per-host telemetry on DNS requests [GH-1537]
|
||||
* Added support for reaping child processes which is useful when running
|
||||
Consul as PID 1 in Docker containers [GH-1539]
|
||||
* Added new `-ui` command line and `ui` config option that enables a built-in
|
||||
@ -113,12 +123,12 @@ BUG FIXES:
|
||||
* Fixed bad lock handler execution during shutdown [GH-1080] [GH-1158] [GH-1214]
|
||||
* Added missing support for AAAA queries for nodes [GH-1222]
|
||||
* Tokens passed from the CLI or API work for maint mode [GH-1230]
|
||||
* Fixed service derigister/reregister flaps that could happen during
|
||||
* Fixed service deregister/reregister flaps that could happen during
|
||||
`consul reload` [GH-1235]
|
||||
* Fixed the Go API client to properly distinguish between expired sessions
|
||||
and sessions that don't exist [GH-1041]
|
||||
* Fixed the KV section of the UI to work on Safari [GH-1321]
|
||||
* Cleaned up Javascript for built-in UI with bug fixes [GH-1338]
|
||||
* Cleaned up JavaScript for built-in UI with bug fixes [GH-1338]
|
||||
|
||||
IMPROVEMENTS:
|
||||
|
||||
@ -255,8 +265,8 @@ FEATURES:
|
||||
* Merge `armon/consul-api` into `api` as official Go client.
|
||||
* Support for distributed locks and semaphores in API client [GH-594] [GH-600]
|
||||
* Support for native HTTP health checks [GH-592]
|
||||
* Support for node and service maintanence modes [GH-606]
|
||||
* Added new "consul maint" command to easily toggle maintanence modes [GH-625]
|
||||
* Support for node and service maintenance modes [GH-606]
|
||||
* Added new "consul maint" command to easily toggle maintenance modes [GH-625]
|
||||
* Added new "consul lock" command for simple highly-available deployments.
|
||||
This lets Consul manage the leader election and easily handle N+1 deployments
|
||||
without the applications being Consul aware. [GH-619]
|
||||
@ -336,7 +346,7 @@ BUG FIXES:
|
||||
* Fixing issue with Session ID and ACL ID generation. [GH-391]
|
||||
* Fixing multiple headers for /v1/event/list endpoint [GH-361]
|
||||
* Fixing graceful leave of leader causing invalid Raft peers [GH-360]
|
||||
* Fixing bug with closing TLS connction on error
|
||||
* Fixing bug with closing TLS connection on error
|
||||
* Fixing issue with node reaping [GH-371]
|
||||
* Fixing aggressive deadlock time [GH-389]
|
||||
* Fixing syslog filter level [GH-272]
|
||||
@ -348,7 +358,7 @@ BUG FIXES:
|
||||
IMPROVEMENTS:
|
||||
|
||||
* Use "critical" health state instead of "unknown" [GH-341]
|
||||
* Consul service can be targed for exec [GH-344]
|
||||
* Consul service can be targeted for exec [GH-344]
|
||||
* Provide debug logging for session invalidation [GH-390]
|
||||
* Added "Deregister" button to UI [GH-364]
|
||||
* Added `enable_truncate` DNS configuration flag [GH-376]
|
||||
@ -417,7 +427,7 @@ BUG FIXES:
|
||||
* Fixed handling of `-rejoin` flag
|
||||
* Restored 0.2 TLS behavior, thanks to @nelhage [GH-233]
|
||||
* Fix the statsite flags, thanks to @nelhage [GH-243]
|
||||
* Fixed filters on criticial / non-passing checks [GH-241]
|
||||
* Fixed filters on critical / non-passing checks [GH-241]
|
||||
* Fixed initial log compaction crash [GH-297]
|
||||
|
||||
IMPROVEMENTS:
|
||||
@ -448,7 +458,7 @@ IMPROVEMENTS:
|
||||
* `info` includes build version information
|
||||
* Sorted results for datacneter list [GH-198]
|
||||
* Switch multiplexing to yamux
|
||||
* Allow multiple CA certis in ca_file [GH-174]
|
||||
* Allow multiple CA certs in ca_file [GH-174]
|
||||
* Enable logging to syslog. [GH-105]
|
||||
* Allow raw key value lookup [GH-150]
|
||||
* Log encryption enabled [GH-151]
|
||||
@ -500,7 +510,7 @@ BUG FIXES:
|
||||
* Windows agents won't show "failed to decode" errors on every RPC
|
||||
request.
|
||||
* Fixed memory leak with RPC clients. [GH-149]
|
||||
* Serf name conflict resoultion disabled. [GH-97]
|
||||
* Serf name conflict resolution disabled. [GH-97]
|
||||
* Raft deadlock possibility fixed. [GH-141]
|
||||
|
||||
MISC:
|
||||
@ -523,7 +533,7 @@ FEATURES:
|
||||
allow for higher throughput and read scalability. [GH-68]
|
||||
* /v1/health/service/ endpoint can take an optional `?passing` flag
|
||||
to filter to only nodes with passing results. [GH-57]
|
||||
* The KV endpoint suports listing keys with the `?keys` query parameter,
|
||||
* The KV endpoint supports listing keys with the `?keys` query parameter,
|
||||
and limited up to a separator using `?separator=`.
|
||||
|
||||
IMPROVEMENTS:
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
"github.com/hashicorp/consul/consul"
|
||||
"github.com/hashicorp/consul/consul/state"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
@ -600,8 +601,8 @@ func (a *Agent) sendCoordinate() {
|
||||
for {
|
||||
rate := a.config.SyncCoordinateRateTarget
|
||||
min := a.config.SyncCoordinateIntervalMin
|
||||
intv := rateScaledInterval(rate, min, len(a.LANMembers()))
|
||||
intv = intv + randomStagger(intv)
|
||||
intv := lib.RateScaledInterval(rate, min, len(a.LANMembers()))
|
||||
intv = intv + lib.RandomStagger(intv)
|
||||
|
||||
select {
|
||||
case <-time.After(intv):
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
"github.com/armon/circbuf"
|
||||
docker "github.com/fsouza/go-dockerclient"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/go-cleanhttp"
|
||||
)
|
||||
|
||||
@ -131,7 +132,7 @@ func (c *CheckMonitor) Stop() {
|
||||
// run is invoked by a goroutine to run until Stop() is called
|
||||
func (c *CheckMonitor) run() {
|
||||
// Get the randomized initial pause time
|
||||
initialPauseTime := randomStagger(c.Interval)
|
||||
initialPauseTime := lib.RandomStagger(c.Interval)
|
||||
c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s", initialPauseTime, c.Script)
|
||||
next := time.After(initialPauseTime)
|
||||
for {
|
||||
@ -366,7 +367,7 @@ func (c *CheckHTTP) Stop() {
|
||||
// run is invoked by a goroutine to run until Stop() is called
|
||||
func (c *CheckHTTP) run() {
|
||||
// Get the randomized initial pause time
|
||||
initialPauseTime := randomStagger(c.Interval)
|
||||
initialPauseTime := lib.RandomStagger(c.Interval)
|
||||
c.Logger.Printf("[DEBUG] agent: pausing %v before first HTTP request of %s", initialPauseTime, c.HTTP)
|
||||
next := time.After(initialPauseTime)
|
||||
for {
|
||||
@ -482,7 +483,7 @@ func (c *CheckTCP) Stop() {
|
||||
// run is invoked by a goroutine to run until Stop() is called
|
||||
func (c *CheckTCP) run() {
|
||||
// Get the randomized initial pause time
|
||||
initialPauseTime := randomStagger(c.Interval)
|
||||
initialPauseTime := lib.RandomStagger(c.Interval)
|
||||
c.Logger.Printf("[DEBUG] agent: pausing %v before first socket connection of %s", initialPauseTime, c.TCP)
|
||||
next := time.After(initialPauseTime)
|
||||
for {
|
||||
@ -580,7 +581,7 @@ func (c *CheckDocker) Stop() {
|
||||
// run is invoked by a goroutine to run until Stop() is called
|
||||
func (c *CheckDocker) run() {
|
||||
// Get the randomized initial pause time
|
||||
initialPauseTime := randomStagger(c.Interval)
|
||||
initialPauseTime := lib.RandomStagger(c.Interval)
|
||||
c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s -c %s in container %s", initialPauseTime, c.Shell, c.Script, c.DockerContainerID)
|
||||
next := time.After(initialPauseTime)
|
||||
for {
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/datadog"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/watch"
|
||||
"github.com/hashicorp/go-checkpoint"
|
||||
"github.com/hashicorp/go-reap"
|
||||
@ -424,7 +425,7 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *log
|
||||
|
||||
// Do an immediate check within the next 30 seconds
|
||||
go func() {
|
||||
time.Sleep(randomStagger(30 * time.Second))
|
||||
time.Sleep(lib.RandomStagger(30 * time.Second))
|
||||
c.checkpointResults(checkpoint.Check(updateParams))
|
||||
}()
|
||||
}
|
||||
|
@ -13,6 +13,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/watch"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
)
|
||||
@ -634,7 +635,7 @@ func DecodeConfig(r io.Reader) (*Config, error) {
|
||||
allowedKeys := []string{"service", "services", "check", "checks"}
|
||||
var unused []string
|
||||
for _, field := range md.Unused {
|
||||
if !strContains(allowedKeys, field) {
|
||||
if !lib.StrContains(allowedKeys, field) {
|
||||
unused = append(unused, field)
|
||||
}
|
||||
}
|
||||
|
@ -11,6 +11,8 @@ import (
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/lib"
|
||||
)
|
||||
|
||||
func TestConfigEncryptBytes(t *testing.T) {
|
||||
@ -1103,7 +1105,7 @@ func TestDecodeConfig_Service(t *testing.T) {
|
||||
t.Fatalf("bad: %v", serv)
|
||||
}
|
||||
|
||||
if !strContains(serv.Tags, "master") {
|
||||
if !lib.StrContains(serv.Tags, "master") {
|
||||
t.Fatalf("bad: %v", serv)
|
||||
}
|
||||
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/hashicorp/consul/consul"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -252,7 +253,7 @@ func (l *localState) UpdateCheck(checkID, status, output string) {
|
||||
if l.config.CheckUpdateInterval > 0 && check.Status == status {
|
||||
check.Output = output
|
||||
if _, ok := l.deferCheck[checkID]; !ok {
|
||||
intv := time.Duration(uint64(l.config.CheckUpdateInterval)/2) + randomStagger(l.config.CheckUpdateInterval)
|
||||
intv := time.Duration(uint64(l.config.CheckUpdateInterval)/2) + lib.RandomStagger(l.config.CheckUpdateInterval)
|
||||
deferSync := time.AfterFunc(intv, func() {
|
||||
l.Lock()
|
||||
if _, ok := l.checkStatus[checkID]; ok {
|
||||
@ -302,11 +303,11 @@ SYNC:
|
||||
case <-l.consulCh:
|
||||
// Stagger the retry on leader election, avoid a thundering heard
|
||||
select {
|
||||
case <-time.After(randomStagger(aeScale(syncStaggerIntv, len(l.iface.LANMembers())))):
|
||||
case <-time.After(lib.RandomStagger(aeScale(syncStaggerIntv, len(l.iface.LANMembers())))):
|
||||
case <-shutdownCh:
|
||||
return
|
||||
}
|
||||
case <-time.After(syncRetryIntv + randomStagger(aeScale(syncRetryIntv, len(l.iface.LANMembers())))):
|
||||
case <-time.After(syncRetryIntv + lib.RandomStagger(aeScale(syncRetryIntv, len(l.iface.LANMembers())))):
|
||||
case <-shutdownCh:
|
||||
return
|
||||
}
|
||||
@ -317,7 +318,7 @@ SYNC:
|
||||
|
||||
// Schedule the next full sync, with a random stagger
|
||||
aeIntv := aeScale(l.config.AEInterval, len(l.iface.LANMembers()))
|
||||
aeIntv = aeIntv + randomStagger(aeIntv)
|
||||
aeIntv = aeIntv + lib.RandomStagger(aeIntv)
|
||||
aeTimer := time.After(aeIntv)
|
||||
|
||||
// Wait for sync events
|
||||
|
@ -3,6 +3,7 @@ package agent
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
@ -10,8 +11,17 @@ import (
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
)
|
||||
|
||||
func generateUUID() (ret string) {
|
||||
var err error
|
||||
if ret, err = uuid.GenerateUUID(); err != nil {
|
||||
panic(fmt.Sprintf("Unable to generate a UUID, %v", err))
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func TestRexecWriter(t *testing.T) {
|
||||
writer := &rexecWriter{
|
||||
BufCh: make(chan []byte, 16),
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"regexp"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -78,7 +79,10 @@ func (a *Agent) UserEvent(dc, token string, params *UserEvent) error {
|
||||
}
|
||||
|
||||
// Format message
|
||||
params.ID = generateUUID()
|
||||
var err error
|
||||
if params.ID, err = uuid.GenerateUUID(); err != nil {
|
||||
return fmt.Errorf("UUID generation failed: %v", err)
|
||||
}
|
||||
params.Version = userEventMaxVersion
|
||||
payload, err := encodeMsgPack(¶ms)
|
||||
if err != nil {
|
||||
|
@ -3,10 +3,8 @@ package agent
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/md5"
|
||||
crand "crypto/rand"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"os"
|
||||
"os/exec"
|
||||
"os/user"
|
||||
@ -39,32 +37,6 @@ func aeScale(interval time.Duration, n int) time.Duration {
|
||||
return time.Duration(multiplier) * interval
|
||||
}
|
||||
|
||||
// rateScaledInterval is used to choose an interval to perform an action in order
|
||||
// to target an aggregate number of actions per second across the whole cluster.
|
||||
func rateScaledInterval(rate float64, min time.Duration, n int) time.Duration {
|
||||
interval := time.Duration(float64(time.Second) * float64(n) / rate)
|
||||
if interval < min {
|
||||
return min
|
||||
}
|
||||
|
||||
return interval
|
||||
}
|
||||
|
||||
// Returns a random stagger interval between 0 and the duration
|
||||
func randomStagger(intv time.Duration) time.Duration {
|
||||
return time.Duration(uint64(rand.Int63()) % uint64(intv))
|
||||
}
|
||||
|
||||
// strContains checks if a list contains a string
|
||||
func strContains(l []string, s string) bool {
|
||||
for _, v := range l {
|
||||
if v == s {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// ExecScript returns a command to execute a script
|
||||
func ExecScript(script string) (*exec.Cmd, error) {
|
||||
var shell, flag string
|
||||
@ -82,21 +54,6 @@ func ExecScript(script string) (*exec.Cmd, error) {
|
||||
return cmd, nil
|
||||
}
|
||||
|
||||
// generateUUID is used to generate a random UUID
|
||||
func generateUUID() string {
|
||||
buf := make([]byte, 16)
|
||||
if _, err := crand.Read(buf); err != nil {
|
||||
panic(fmt.Errorf("failed to read random bytes: %v", err))
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%08x-%04x-%04x-%04x-%12x",
|
||||
buf[0:4],
|
||||
buf[4:6],
|
||||
buf[6:8],
|
||||
buf[8:10],
|
||||
buf[10:16])
|
||||
}
|
||||
|
||||
// decodeMsgPack is used to decode a MsgPack encoded object
|
||||
func decodeMsgPack(buf []byte, out interface{}) error {
|
||||
return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out)
|
||||
|
@ -24,39 +24,6 @@ func TestAEScale(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRateScaledInterval(t *testing.T) {
|
||||
min := 1 * time.Second
|
||||
rate := 200.0
|
||||
if v := rateScaledInterval(rate, min, 0); v != min {
|
||||
t.Fatalf("Bad: %v", v)
|
||||
}
|
||||
if v := rateScaledInterval(rate, min, 100); v != min {
|
||||
t.Fatalf("Bad: %v", v)
|
||||
}
|
||||
if v := rateScaledInterval(rate, min, 200); v != 1*time.Second {
|
||||
t.Fatalf("Bad: %v", v)
|
||||
}
|
||||
if v := rateScaledInterval(rate, min, 1000); v != 5*time.Second {
|
||||
t.Fatalf("Bad: %v", v)
|
||||
}
|
||||
if v := rateScaledInterval(rate, min, 5000); v != 25*time.Second {
|
||||
t.Fatalf("Bad: %v", v)
|
||||
}
|
||||
if v := rateScaledInterval(rate, min, 10000); v != 50*time.Second {
|
||||
t.Fatalf("Bad: %v", v)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRandomStagger(t *testing.T) {
|
||||
intv := time.Minute
|
||||
for i := 0; i < 10; i++ {
|
||||
stagger := randomStagger(intv)
|
||||
if stagger < 0 || stagger >= intv {
|
||||
t.Fatalf("Bad: %v", stagger)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestStringHash(t *testing.T) {
|
||||
in := "hello world"
|
||||
expected := "5eb63bbbe01eeed093cb22bb8f5acdc3"
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
)
|
||||
|
||||
// ACL endpoint is used to manipulate ACLs
|
||||
@ -62,7 +63,11 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error {
|
||||
if args.ACL.ID == "" {
|
||||
state := a.srv.fsm.State()
|
||||
for {
|
||||
args.ACL.ID = generateUUID()
|
||||
if args.ACL.ID, err = uuid.GenerateUUID(); err != nil {
|
||||
a.srv.logger.Printf("[ERR] consul.acl: UUID generation failed: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
_, acl, err := state.ACLGet(args.ACL.ID)
|
||||
if err != nil {
|
||||
a.srv.logger.Printf("[ERR] consul.acl: ACL lookup failed: %v", err)
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
)
|
||||
@ -436,7 +437,7 @@ func TestACLEndpoint_List(t *testing.T) {
|
||||
if s.ID == anonymousToken || s.ID == "root" {
|
||||
continue
|
||||
}
|
||||
if !strContains(ids, s.ID) {
|
||||
if !lib.StrContains(ids, s.ID) {
|
||||
t.Fatalf("bad: %v", s)
|
||||
}
|
||||
if s.Name != "User token" {
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
)
|
||||
@ -978,7 +979,7 @@ func TestCatalogNodeServices(t *testing.T) {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
services := out.NodeServices.Services
|
||||
if !strContains(services["db"].Tags, "primary") || services["db"].Port != 5000 {
|
||||
if !lib.StrContains(services["db"].Tags, "primary") || services["db"].Port != 5000 {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if len(services["web"].Tags) != 0 || services["web"].Port != 80 {
|
||||
|
@ -12,14 +12,51 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
const (
|
||||
// clientRPCCache controls how long we keep an idle connection
|
||||
// open to a server
|
||||
clientRPCCache = 30 * time.Second
|
||||
// clientRPCMinReuseDuration controls the minimum amount of time RPC
|
||||
// queries are sent over an established connection to a single server
|
||||
clientRPCMinReuseDuration = 120 * time.Second
|
||||
|
||||
// clientRPCJitterFraction determines the amount of jitter added to
|
||||
// clientRPCMinReuseDuration before a connection is expired and a new
|
||||
// connection is established in order to rebalance load across consul
|
||||
// servers. The cluster-wide number of connections per second from
|
||||
// rebalancing is applied after this jitter to ensure the CPU impact
|
||||
// is always finite. See newRebalanceConnsPerSecPerServer's comment
|
||||
// for additional commentary.
|
||||
//
|
||||
// For example, in a 10K consul cluster with 5x servers, this default
|
||||
// averages out to ~13 new connections from rebalancing per server
|
||||
// per second (each connection is reused for 120s to 180s).
|
||||
clientRPCJitterFraction = 2
|
||||
|
||||
// Limit the number of new connections a server receives per second
|
||||
// for connection rebalancing. This limit caps the load caused by
|
||||
// continual rebalancing efforts when a cluster is in equilibrium. A
|
||||
// lower value comes at the cost of increased recovery time after a
|
||||
// partition. This parameter begins to take effect when there are
|
||||
// more than ~48K clients querying 5x servers or at lower server
|
||||
// values when there is a partition.
|
||||
//
|
||||
// For example, in a 100K consul cluster with 5x servers, it will
|
||||
// take ~5min for all servers to rebalance their connections. If
|
||||
// 99,995 agents are in the minority talking to only one server, it
|
||||
// will take ~26min for all servers to rebalance. A 10K cluster in
|
||||
// the same scenario will take ~2.6min to rebalance.
|
||||
newRebalanceConnsPerSecPerServer = 64
|
||||
|
||||
// clientRPCConnMaxIdle controls how long we keep an idle connection
|
||||
// open to a server. 127s was chosen as the first prime above 120s
|
||||
// (arbitrarily chose to use a prime) with the intent of reusing
|
||||
// connections who are used by once-a-minute cron(8) jobs *and* who
|
||||
// use a 60s jitter window (e.g. in vixie cron job execution can
|
||||
// drift by up to 59s per job, or 119s for a once-a-minute cron job).
|
||||
clientRPCConnMaxIdle = 127 * time.Second
|
||||
|
||||
// clientMaxStreams controls how many idle streams we keep
|
||||
// open to a server
|
||||
@ -56,6 +93,10 @@ type Client struct {
|
||||
lastServer *serverParts
|
||||
lastRPCTime time.Time
|
||||
|
||||
// connRebalanceTime is the time at which we should change the server
|
||||
// we query for RPC requests.
|
||||
connRebalanceTime time.Time
|
||||
|
||||
// Logger uses the provided LogOutput
|
||||
logger *log.Logger
|
||||
|
||||
@ -103,7 +144,7 @@ func NewClient(config *Config) (*Client, error) {
|
||||
// Create server
|
||||
c := &Client{
|
||||
config: config,
|
||||
connPool: NewPool(config.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap),
|
||||
connPool: NewPool(config.LogOutput, clientRPCConnMaxIdle, clientMaxStreams, tlsWrap),
|
||||
eventCh: make(chan serf.Event, 256),
|
||||
logger: logger,
|
||||
shutdownCh: make(chan struct{}),
|
||||
@ -328,37 +369,64 @@ func (c *Client) localEvent(event serf.UserEvent) {
|
||||
|
||||
// RPC is used to forward an RPC call to a consul server, or fail if no servers
|
||||
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
|
||||
// Check the last rpc time
|
||||
// Check to make sure we haven't spent too much time querying a
|
||||
// single server
|
||||
now := time.Now()
|
||||
if !c.connRebalanceTime.IsZero() && now.After(c.connRebalanceTime) {
|
||||
c.logger.Printf("[DEBUG] consul: connection time to server %s exceeded, rotating server connection", c.lastServer.Addr)
|
||||
c.lastServer = nil
|
||||
}
|
||||
|
||||
// Allocate these vars on the stack before the goto
|
||||
var numConsulServers int
|
||||
var clusterWideRebalanceConnsPerSec float64
|
||||
var connReuseLowWaterMark time.Duration
|
||||
var numLANMembers int
|
||||
|
||||
// Check the last RPC time, continue to reuse cached connection for
|
||||
// up to clientRPCMinReuseDuration unless exceeded
|
||||
// clientRPCConnMaxIdle
|
||||
lastRPCTime := now.Sub(c.lastRPCTime)
|
||||
var server *serverParts
|
||||
if time.Now().Sub(c.lastRPCTime) < clientRPCCache {
|
||||
if c.lastServer != nil && lastRPCTime < clientRPCConnMaxIdle {
|
||||
server = c.lastServer
|
||||
if server != nil {
|
||||
goto TRY_RPC
|
||||
}
|
||||
goto TRY_RPC
|
||||
}
|
||||
|
||||
// Bail if we can't find any servers
|
||||
c.consulLock.RLock()
|
||||
if len(c.consuls) == 0 {
|
||||
numConsulServers = len(c.consuls)
|
||||
if numConsulServers == 0 {
|
||||
c.consulLock.RUnlock()
|
||||
return structs.ErrNoServers
|
||||
}
|
||||
|
||||
// Select a random addr
|
||||
server = c.consuls[rand.Int31()%int32(len(c.consuls))]
|
||||
server = c.consuls[rand.Int31n(int32(numConsulServers))]
|
||||
c.consulLock.RUnlock()
|
||||
|
||||
// Limit this connection's life based on the size (and health) of the
|
||||
// cluster. Never rebalance a connection more frequently than
|
||||
// connReuseLowWaterMark, and make sure we never exceed
|
||||
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
|
||||
clusterWideRebalanceConnsPerSec = float64(numConsulServers * newRebalanceConnsPerSecPerServer)
|
||||
connReuseLowWaterMark = clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction)
|
||||
numLANMembers = len(c.LANMembers())
|
||||
c.connRebalanceTime = now.Add(lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWaterMark, numLANMembers))
|
||||
c.logger.Printf("[DEBUG] consul: connection to server %s will expire at %v", server.Addr, c.connRebalanceTime)
|
||||
|
||||
// Forward to remote Consul
|
||||
TRY_RPC:
|
||||
if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil {
|
||||
c.lastServer = nil
|
||||
c.connRebalanceTime = time.Time{}
|
||||
c.lastRPCTime = time.Time{}
|
||||
c.lastServer = nil
|
||||
return err
|
||||
}
|
||||
|
||||
// Cache the last server
|
||||
c.lastServer = server
|
||||
c.lastRPCTime = time.Now()
|
||||
c.lastRPCTime = now
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -2,12 +2,15 @@ package consul
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/consul/state"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/raft"
|
||||
)
|
||||
|
||||
@ -38,6 +41,14 @@ func makeLog(buf []byte) *raft.Log {
|
||||
}
|
||||
}
|
||||
|
||||
func generateUUID() (ret string) {
|
||||
var err error
|
||||
if ret, err = uuid.GenerateUUID(); err != nil {
|
||||
panic(fmt.Sprintf("Unable to generate a UUID, %v", err))
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func TestFSM_RegisterNode(t *testing.T) {
|
||||
fsm, err := NewFSM(nil, os.Stderr)
|
||||
if err != nil {
|
||||
@ -452,7 +463,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
|
||||
if len(fooSrv.Services) != 2 {
|
||||
t.Fatalf("Bad: %v", fooSrv)
|
||||
}
|
||||
if !strContains(fooSrv.Services["db"].Tags, "primary") {
|
||||
if !lib.StrContains(fooSrv.Services["db"].Tags, "primary") {
|
||||
t.Fatalf("Bad: %v", fooSrv)
|
||||
}
|
||||
if fooSrv.Services["db"].Port != 5000 {
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
)
|
||||
@ -377,10 +378,10 @@ func TestHealth_ServiceNodes(t *testing.T) {
|
||||
if nodes[1].Node.Node != "foo" {
|
||||
t.Fatalf("Bad: %v", nodes[1])
|
||||
}
|
||||
if !strContains(nodes[0].Service.Tags, "slave") {
|
||||
if !lib.StrContains(nodes[0].Service.Tags, "slave") {
|
||||
t.Fatalf("Bad: %v", nodes[0])
|
||||
}
|
||||
if !strContains(nodes[1].Service.Tags, "master") {
|
||||
if !lib.StrContains(nodes[1].Service.Tags, "master") {
|
||||
t.Fatalf("Bad: %v", nodes[1])
|
||||
}
|
||||
if nodes[0].Checks[0].Status != structs.HealthWarning {
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
)
|
||||
@ -56,7 +57,7 @@ func TestInternal_NodeInfo(t *testing.T) {
|
||||
if nodes[0].Node != "foo" {
|
||||
t.Fatalf("Bad: %v", nodes[0])
|
||||
}
|
||||
if !strContains(nodes[0].Services[0].Tags, "master") {
|
||||
if !lib.StrContains(nodes[0].Services[0].Tags, "master") {
|
||||
t.Fatalf("Bad: %v", nodes[0])
|
||||
}
|
||||
if nodes[0].Checks[0].Status != structs.HealthPassing {
|
||||
@ -130,7 +131,7 @@ func TestInternal_NodeDump(t *testing.T) {
|
||||
switch node.Node {
|
||||
case "foo":
|
||||
foundFoo = true
|
||||
if !strContains(node.Services[0].Tags, "master") {
|
||||
if !lib.StrContains(node.Services[0].Tags, "master") {
|
||||
t.Fatalf("Bad: %v", nodes[0])
|
||||
}
|
||||
if node.Checks[0].Status != structs.HealthPassing {
|
||||
@ -139,7 +140,7 @@ func TestInternal_NodeDump(t *testing.T) {
|
||||
|
||||
case "bar":
|
||||
foundBar = true
|
||||
if !strContains(node.Services[0].Tags, "slave") {
|
||||
if !lib.StrContains(node.Services[0].Tags, "slave") {
|
||||
t.Fatalf("Bad: %v", nodes[1])
|
||||
}
|
||||
if node.Checks[0].Status != structs.HealthWarning {
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -41,7 +42,9 @@ func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string)
|
||||
// to collide since this isn't inside a write transaction.
|
||||
state := p.srv.fsm.State()
|
||||
for {
|
||||
args.Query.ID = generateUUID()
|
||||
if args.Query.ID, err = uuid.GenerateUUID(); err != nil {
|
||||
return fmt.Errorf("UUID generation for prepared query failed: %v", err)
|
||||
}
|
||||
_, query, err := state.PreparedQueryGet(args.Query.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Prepared query lookup failed: %v", err)
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/consul/state"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/memberlist"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/yamux"
|
||||
@ -329,7 +330,7 @@ func (s *Server) blockingRPC(queryOpts *structs.QueryOptions, queryMeta *structs
|
||||
}
|
||||
|
||||
// Apply a small amount of jitter to the request.
|
||||
queryOpts.MaxQueryTime += randomStagger(queryOpts.MaxQueryTime / jitterFraction)
|
||||
queryOpts.MaxQueryTime += lib.RandomStagger(queryOpts.MaxQueryTime / jitterFraction)
|
||||
|
||||
// Setup a query timeout.
|
||||
timeout = time.NewTimer(queryOpts.MaxQueryTime)
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
)
|
||||
|
||||
// Session endpoint is used to manipulate sessions for KV
|
||||
@ -61,7 +62,11 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
|
||||
// Generate a new session ID, verify uniqueness
|
||||
state := s.srv.fsm.State()
|
||||
for {
|
||||
args.Session.ID = generateUUID()
|
||||
var err error
|
||||
if args.Session.ID, err = uuid.GenerateUUID(); err != nil {
|
||||
s.srv.logger.Printf("[ERR] consul.session: UUID generation failed: %v", err)
|
||||
return err
|
||||
}
|
||||
_, sess, err := state.SessionGet(args.Session.ID)
|
||||
if err != nil {
|
||||
s.srv.logger.Printf("[ERR] consul.session: Session lookup failed: %v", err)
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
)
|
||||
@ -217,7 +218,7 @@ func TestSessionEndpoint_List(t *testing.T) {
|
||||
}
|
||||
for i := 0; i < len(sessions.Sessions); i++ {
|
||||
s := sessions.Sessions[i]
|
||||
if !strContains(ids, s.ID) {
|
||||
if !lib.StrContains(ids, s.ID) {
|
||||
t.Fatalf("bad: %v", s)
|
||||
}
|
||||
if s.Node != "foo" {
|
||||
@ -318,7 +319,7 @@ func TestSessionEndpoint_Renew(t *testing.T) {
|
||||
}
|
||||
for i := 0; i < len(sessions.Sessions); i++ {
|
||||
s := sessions.Sessions[i]
|
||||
if !strContains(ids, s.ID) {
|
||||
if !lib.StrContains(ids, s.ID) {
|
||||
t.Fatalf("bad: %v", s)
|
||||
}
|
||||
if s.Node != "foo" {
|
||||
@ -352,7 +353,7 @@ func TestSessionEndpoint_Renew(t *testing.T) {
|
||||
}
|
||||
|
||||
s := session.Sessions[0]
|
||||
if !strContains(ids, s.ID) {
|
||||
if !lib.StrContains(ids, s.ID) {
|
||||
t.Fatalf("bad: %v", s)
|
||||
}
|
||||
if s.Node != "foo" {
|
||||
@ -379,7 +380,7 @@ func TestSessionEndpoint_Renew(t *testing.T) {
|
||||
|
||||
for i := 0; i < len(sessionsL1.Sessions); i++ {
|
||||
s := sessionsL1.Sessions[i]
|
||||
if !strContains(ids, s.ID) {
|
||||
if !lib.StrContains(ids, s.ID) {
|
||||
t.Fatalf("bad: %v", s)
|
||||
}
|
||||
if s.Node != "foo" {
|
||||
@ -411,7 +412,7 @@ func TestSessionEndpoint_Renew(t *testing.T) {
|
||||
if len(sessionsL2.Sessions) != 0 {
|
||||
for i := 0; i < len(sessionsL2.Sessions); i++ {
|
||||
s := sessionsL2.Sessions[i]
|
||||
if !strContains(ids, s.ID) {
|
||||
if !lib.StrContains(ids, s.ID) {
|
||||
t.Fatalf("bad: %v", s)
|
||||
}
|
||||
if s.Node != "foo" {
|
||||
@ -476,7 +477,7 @@ func TestSessionEndpoint_NodeSessions(t *testing.T) {
|
||||
}
|
||||
for i := 0; i < len(sessions.Sessions); i++ {
|
||||
s := sessions.Sessions[i]
|
||||
if !strContains(ids, s.ID) {
|
||||
if !lib.StrContains(ids, s.ID) {
|
||||
t.Fatalf("bad: %v", s)
|
||||
}
|
||||
if s.Node != "foo" {
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
)
|
||||
|
||||
@ -1189,16 +1190,6 @@ func TestStateStore_Services(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// strContains checks if a list contains a string
|
||||
func strContains(l []string, s string) bool {
|
||||
for _, v := range l {
|
||||
if v == s {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func TestStateStore_ServiceNodes(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
@ -1249,7 +1240,7 @@ func TestStateStore_ServiceNodes(t *testing.T) {
|
||||
if nodes[0].ServiceID != "db" {
|
||||
t.Fatalf("bad: %v", nodes)
|
||||
}
|
||||
if !strContains(nodes[0].ServiceTags, "slave") {
|
||||
if !lib.StrContains(nodes[0].ServiceTags, "slave") {
|
||||
t.Fatalf("bad: %v", nodes)
|
||||
}
|
||||
if nodes[0].ServicePort != 8000 {
|
||||
@ -1265,7 +1256,7 @@ func TestStateStore_ServiceNodes(t *testing.T) {
|
||||
if nodes[1].ServiceID != "db2" {
|
||||
t.Fatalf("bad: %v", nodes)
|
||||
}
|
||||
if !strContains(nodes[1].ServiceTags, "slave") {
|
||||
if !lib.StrContains(nodes[1].ServiceTags, "slave") {
|
||||
t.Fatalf("bad: %v", nodes)
|
||||
}
|
||||
if nodes[1].ServicePort != 8001 {
|
||||
@ -1281,7 +1272,7 @@ func TestStateStore_ServiceNodes(t *testing.T) {
|
||||
if nodes[2].ServiceID != "db" {
|
||||
t.Fatalf("bad: %v", nodes)
|
||||
}
|
||||
if !strContains(nodes[2].ServiceTags, "master") {
|
||||
if !lib.StrContains(nodes[2].ServiceTags, "master") {
|
||||
t.Fatalf("bad: %v", nodes)
|
||||
}
|
||||
if nodes[2].ServicePort != 8000 {
|
||||
@ -1328,7 +1319,7 @@ func TestStateStore_ServiceTagNodes(t *testing.T) {
|
||||
if nodes[0].Address != "127.0.0.1" {
|
||||
t.Fatalf("bad: %v", nodes)
|
||||
}
|
||||
if !strContains(nodes[0].ServiceTags, "master") {
|
||||
if !lib.StrContains(nodes[0].ServiceTags, "master") {
|
||||
t.Fatalf("bad: %v", nodes)
|
||||
}
|
||||
if nodes[0].ServicePort != 8000 {
|
||||
@ -1375,7 +1366,7 @@ func TestStateStore_ServiceTagNodes_MultipleTags(t *testing.T) {
|
||||
if nodes[0].Address != "127.0.0.1" {
|
||||
t.Fatalf("bad: %v", nodes)
|
||||
}
|
||||
if !strContains(nodes[0].ServiceTags, "master") {
|
||||
if !lib.StrContains(nodes[0].ServiceTags, "master") {
|
||||
t.Fatalf("bad: %v", nodes)
|
||||
}
|
||||
if nodes[0].ServicePort != 8000 {
|
||||
@ -1409,7 +1400,7 @@ func TestStateStore_ServiceTagNodes_MultipleTags(t *testing.T) {
|
||||
if nodes[0].Address != "127.0.0.1" {
|
||||
t.Fatalf("bad: %v", nodes)
|
||||
}
|
||||
if !strContains(nodes[0].ServiceTags, "dev") {
|
||||
if !lib.StrContains(nodes[0].ServiceTags, "dev") {
|
||||
t.Fatalf("bad: %v", nodes)
|
||||
}
|
||||
if nodes[0].ServicePort != 8001 {
|
||||
|
@ -1,17 +1,13 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
crand "crypto/rand"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
@ -83,24 +79,6 @@ func init() {
|
||||
privateBlocks[5] = block
|
||||
}
|
||||
|
||||
// strContains checks if a list contains a string
|
||||
func strContains(l []string, s string) bool {
|
||||
for _, v := range l {
|
||||
if v == s {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func ToLowerList(l []string) []string {
|
||||
var out []string
|
||||
for _, value := range l {
|
||||
out = append(out, strings.ToLower(value))
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// ensurePath is used to make sure a path exists
|
||||
func ensurePath(path string, dir bool) error {
|
||||
if !dir {
|
||||
@ -309,23 +287,3 @@ func runtimeStats() map[string]string {
|
||||
"cpu_count": strconv.FormatInt(int64(runtime.NumCPU()), 10),
|
||||
}
|
||||
}
|
||||
|
||||
// generateUUID is used to generate a random UUID
|
||||
func generateUUID() string {
|
||||
buf := make([]byte, 16)
|
||||
if _, err := crand.Read(buf); err != nil {
|
||||
panic(fmt.Errorf("failed to read random bytes: %v", err))
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%08x-%04x-%04x-%04x-%12x",
|
||||
buf[0:4],
|
||||
buf[4:6],
|
||||
buf[6:8],
|
||||
buf[8:10],
|
||||
buf[10:16])
|
||||
}
|
||||
|
||||
// Returns a random stagger interval between 0 and the duration
|
||||
func randomStagger(intv time.Duration) time.Duration {
|
||||
return time.Duration(uint64(rand.Int63()) % uint64(intv))
|
||||
}
|
||||
|
@ -6,30 +6,10 @@ import (
|
||||
"net"
|
||||
"regexp"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
func TestStrContains(t *testing.T) {
|
||||
l := []string{"a", "b", "c"}
|
||||
if !strContains(l, "b") {
|
||||
t.Fatalf("should contain")
|
||||
}
|
||||
if strContains(l, "d") {
|
||||
t.Fatalf("should not contain")
|
||||
}
|
||||
}
|
||||
|
||||
func TestToLowerList(t *testing.T) {
|
||||
l := []string{"ABC", "Abc", "abc"}
|
||||
for _, value := range ToLowerList(l) {
|
||||
if value != "abc" {
|
||||
t.Fatalf("failed lowercasing")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetPrivateIP(t *testing.T) {
|
||||
ip, _, err := net.ParseCIDR("10.1.2.3/32")
|
||||
if err != nil {
|
||||
@ -295,13 +275,3 @@ func TestGenerateUUID(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRandomStagger(t *testing.T) {
|
||||
intv := time.Minute
|
||||
for i := 0; i < 10; i++ {
|
||||
stagger := randomStagger(intv)
|
||||
if stagger < 0 || stagger >= intv {
|
||||
t.Fatalf("Bad: %v", stagger)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
set -e
|
||||
|
||||
ZSH_FUNC_DIR="/usr/share/zsh/site-functions"
|
||||
|
23
lib/cluster.go
Normal file
23
lib/cluster.go
Normal file
@ -0,0 +1,23 @@
|
||||
package lib
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Returns a random stagger interval between 0 and the duration
|
||||
func RandomStagger(intv time.Duration) time.Duration {
|
||||
return time.Duration(uint64(rand.Int63()) % uint64(intv))
|
||||
}
|
||||
|
||||
// RateScaledInterval is used to choose an interval to perform an action in
|
||||
// order to target an aggregate number of actions per second across the whole
|
||||
// cluster.
|
||||
func RateScaledInterval(rate float64, min time.Duration, n int) time.Duration {
|
||||
interval := time.Duration(float64(time.Second) * float64(n) / rate)
|
||||
if interval < min {
|
||||
return min
|
||||
}
|
||||
|
||||
return interval
|
||||
}
|
39
lib/cluster_test.go
Normal file
39
lib/cluster_test.go
Normal file
@ -0,0 +1,39 @@
|
||||
package lib
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestRandomStagger(t *testing.T) {
|
||||
intv := time.Minute
|
||||
for i := 0; i < 10; i++ {
|
||||
stagger := RandomStagger(intv)
|
||||
if stagger < 0 || stagger >= intv {
|
||||
t.Fatalf("Bad: %v", stagger)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRateScaledInterval(t *testing.T) {
|
||||
min := 1 * time.Second
|
||||
rate := 200.0
|
||||
if v := RateScaledInterval(rate, min, 0); v != min {
|
||||
t.Fatalf("Bad: %v", v)
|
||||
}
|
||||
if v := RateScaledInterval(rate, min, 100); v != min {
|
||||
t.Fatalf("Bad: %v", v)
|
||||
}
|
||||
if v := RateScaledInterval(rate, min, 200); v != 1*time.Second {
|
||||
t.Fatalf("Bad: %v", v)
|
||||
}
|
||||
if v := RateScaledInterval(rate, min, 1000); v != 5*time.Second {
|
||||
t.Fatalf("Bad: %v", v)
|
||||
}
|
||||
if v := RateScaledInterval(rate, min, 5000); v != 25*time.Second {
|
||||
t.Fatalf("Bad: %v", v)
|
||||
}
|
||||
if v := RateScaledInterval(rate, min, 10000); v != 50*time.Second {
|
||||
t.Fatalf("Bad: %v", v)
|
||||
}
|
||||
}
|
18
lib/rand.go
Normal file
18
lib/rand.go
Normal file
@ -0,0 +1,18 @@
|
||||
package lib
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
once sync.Once
|
||||
)
|
||||
|
||||
// SeedMathRand provides weak, but guaranteed seeding, which is better than
|
||||
// running with Go's default seed of 1. A call to SeedMathRand() is expected
|
||||
// to be called via init(), but never a second time.
|
||||
func SeedMathRand() {
|
||||
once.Do(func() { rand.Seed(time.Now().UTC().UnixNano()) })
|
||||
}
|
11
lib/string.go
Normal file
11
lib/string.go
Normal file
@ -0,0 +1,11 @@
|
||||
package lib
|
||||
|
||||
// StrContains checks if a list contains a string
|
||||
func StrContains(l []string, s string) bool {
|
||||
for _, v := range l {
|
||||
if v == s {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
15
lib/string_test.go
Normal file
15
lib/string_test.go
Normal file
@ -0,0 +1,15 @@
|
||||
package lib
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestStrContains(t *testing.T) {
|
||||
l := []string{"a", "b", "c"}
|
||||
if !StrContains(l, "b") {
|
||||
t.Fatalf("should contain")
|
||||
}
|
||||
if StrContains(l, "d") {
|
||||
t.Fatalf("should not contain")
|
||||
}
|
||||
}
|
6
main.go
6
main.go
@ -6,8 +6,14 @@ import (
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
|
||||
"github.com/hashicorp/consul/lib"
|
||||
)
|
||||
|
||||
func init() {
|
||||
lib.SeedMathRand()
|
||||
}
|
||||
|
||||
func main() {
|
||||
os.Exit(realMain())
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# This script builds the application from source for multiple platforms.
|
||||
set -e
|
||||
|
@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
set -e
|
||||
|
||||
# Get the version from the command line
|
||||
|
@ -1,12 +1,12 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
|
||||
grep generateUUID consul/state/state_store.go
|
||||
grep GenerateUUID consul/state/state_store.go
|
||||
RESULT=$?
|
||||
if [ $RESULT -eq 0 ]; then
|
||||
exit 1
|
||||
fi
|
||||
|
||||
grep generateUUID consul/fsm.go
|
||||
grep GenerateUUID consul/fsm.go
|
||||
RESULT=$?
|
||||
if [ $RESULT -eq 0 ]; then
|
||||
exit 1
|
||||
|
@ -5,10 +5,10 @@ setlocal
|
||||
if not exist %1\consul\state\state_store.go exit /B 1
|
||||
if not exist %1\consul\fsm.go exit /B 1
|
||||
|
||||
findstr /R generateUUID %1\consul\state\state_store.go 1>nul
|
||||
findstr /R GenerateUUID %1\consul\state\state_store.go 1>nul
|
||||
if not %ERRORLEVEL% EQU 1 exit /B 1
|
||||
|
||||
findstr generateUUID %1\consul\fsm.go 1>nul
|
||||
findstr GenerateUUID %1\consul\fsm.go 1>nul
|
||||
if not %ERRORLEVEL% EQU 1 exit /B 1
|
||||
|
||||
exit /B 0
|
||||
|
@ -34,7 +34,7 @@
|
||||
"bundle check || bundle install --jobs 7",
|
||||
"bundle exec middleman build",
|
||||
|
||||
"/bin/bash ./scripts/deploy.sh"
|
||||
"/usr/bin/env bash ./scripts/deploy.sh"
|
||||
]
|
||||
}
|
||||
]
|
||||
|
@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
set -e
|
||||
|
||||
PROJECT="consul"
|
||||
|
@ -299,7 +299,7 @@ a JSON body will be returned like this:
|
||||
},
|
||||
"Datacenter": "dc3",
|
||||
"Failovers": 2
|
||||
}
|
||||
}]
|
||||
}
|
||||
```
|
||||
|
||||
|
@ -73,13 +73,56 @@ DNS on port 8600.
|
||||
|
||||
### Dnsmasq Setup
|
||||
|
||||
Dnsmasq is typically configured via files in the `/etc/dnsmasq.d` directory. To configure Consul, create the file `/etc/dnsmasq.d/10-consul` with the following contents:
|
||||
Dnsmasq is typically configured via a `dnsmasq.conf` or a series of files in
|
||||
the `/etc/dnsmasq.d` directory. In Dnsmasq's configuration file
|
||||
(e.g. `/etc/dnsmasq.d/10-consul`), add the following:
|
||||
|
||||
```text
|
||||
# Enable forward lookup of the 'consul' domain:
|
||||
server=/consul/127.0.0.1#8600
|
||||
|
||||
# Uncomment and modify as appropriate to enable reverse DNS lookups for
|
||||
# common netblocks found in RFC 1918, 5735, and 6598:
|
||||
#rev-server=0.0.0.0/8,127.0.0.1#8600
|
||||
#rev-server=10.0.0.0/8,127.0.0.1#8600
|
||||
#rev-server=100.64.0.0/10,127.0.0.1#8600
|
||||
#rev-server=127.0.0.1/8,127.0.0.1#8600
|
||||
#rev-server=169.254.0.0/16,127.0.0.1#8600
|
||||
#rev-server=172.16.0.0/12,127.0.0.1#8600
|
||||
#rev-server=192.168.0.0/16,127.0.0.1#8600
|
||||
#rev-server=224.0.0.0/4,127.0.0.1#8600
|
||||
#rev-server=240.0.0.0/4,127.0.0.1#8600
|
||||
```
|
||||
|
||||
Once that configuration is created, restart the dnsmasq service.
|
||||
Once that configuration is created, restart the `dnsmasq` service.
|
||||
|
||||
Additional useful settings in `dnsmasq` to consider include (see
|
||||
[`dnsmasq(8)`](http://www.thekelleys.org.uk/dnsmasq/docs/dnsmasq-man.html)
|
||||
for additional details):
|
||||
|
||||
```
|
||||
# Accept DNS queries only from hosts whose address is on a local subnet.
|
||||
#local-service
|
||||
|
||||
# Don't poll /etc/resolv.conf for changes.
|
||||
#no-poll
|
||||
|
||||
# Don't read /etc/resolv.conf. Get upstream servers only from the command
|
||||
# line or the dnsmasq configuration file (see the "server" directive below).
|
||||
#no-resolv
|
||||
|
||||
# Specify IP address(es) of other DNS servers for queries not handled
|
||||
# directly by consul. There is normally one 'server' entry set for every
|
||||
# 'nameserver' parameter found in '/etc/resolv.conf'. See dnsmasq(8)'s
|
||||
# 'server' configuration option for details.
|
||||
#server=1.2.3.4
|
||||
#server=208.67.222.222
|
||||
#server=8.8.8.8
|
||||
|
||||
# Set the size of dnsmasq's cache. The default is 150 names. Setting the
|
||||
# cache size to zero disables caching.
|
||||
#cache-size=65536
|
||||
```
|
||||
|
||||
### Testing
|
||||
|
||||
@ -107,7 +150,8 @@ master.redis.service.dc-1.consul. 0 IN A 172.31.3.234
|
||||
;; MSG SIZE rcvd: 76
|
||||
```
|
||||
|
||||
Then run the same query against your BIND instance and make sure you get a result:
|
||||
Then run the same query against your BIND instance and make sure you get a
|
||||
valid result:
|
||||
|
||||
```text
|
||||
[root@localhost ~]# dig @localhost -p 53 master.redis.service.dc-1.consul. A
|
||||
@ -131,10 +175,40 @@ master.redis.service.dc-1.consul. 0 IN A 172.31.3.234
|
||||
;; MSG SIZE rcvd: 76
|
||||
```
|
||||
|
||||
If desired, verify reverse DNS using the same methodology:
|
||||
|
||||
```text
|
||||
[root@localhost ~]# dig @127.0.0.1 -p 8600 133.139.16.172.in-addr.arpa. PTR
|
||||
|
||||
; <<>> DiG 9.10.3-P3 <<>> @127.0.0.1 -p 8600 133.139.16.172.in-addr.arpa. PTR
|
||||
; (1 server found)
|
||||
;; global options: +cmd
|
||||
;; Got answer:
|
||||
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 3713
|
||||
;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 0
|
||||
;; WARNING: recursion requested but not available
|
||||
|
||||
;; QUESTION SECTION:
|
||||
;133.139.16.172.in-addr.arpa. IN PTR
|
||||
|
||||
;; ANSWER SECTION:
|
||||
133.139.16.172.in-addr.arpa. 0 IN PTR consul1.node.dc1.consul.
|
||||
|
||||
;; Query time: 3 msec
|
||||
;; SERVER: 127.0.0.1#8600(127.0.0.1)
|
||||
;; WHEN: Sun Jan 31 04:25:39 UTC 2016
|
||||
;; MSG SIZE rcvd: 109
|
||||
[root@localhost ~]# dig @127.0.0.1 +short -x 172.16.139.133
|
||||
consul1.node.dc1.consul.
|
||||
```
|
||||
|
||||
### Troubleshooting
|
||||
|
||||
If you don't get an answer from BIND but you do get an answer from Consul, your
|
||||
best bet is to turn on BIND's query log to see what's happening:
|
||||
If you don't get an answer from your DNS server (e.g. BIND, Dnsmasq) but you
|
||||
do get an answer from Consul, your best bet is to turn on your DNS server's
|
||||
query log to see what's happening.
|
||||
|
||||
For BIND:
|
||||
|
||||
```text
|
||||
[root@localhost ~]# rndc querylog
|
||||
@ -152,3 +226,6 @@ This indicates that DNSSEC is not disabled properly.
|
||||
|
||||
If you see errors about network connections, verify that there are no firewall
|
||||
or routing problems between the servers running BIND and Consul.
|
||||
|
||||
For Dnsmasq, see the `log-queries` configuration option and the `USR1`
|
||||
signal.
|
||||
|
@ -105,6 +105,9 @@ description: |-
|
||||
<li>
|
||||
<a href="http://xordataexchange.github.io/crypt/">crypt</a> - Store and retrieve encrypted configuration parameters from etcd or Consul
|
||||
</li>
|
||||
<li>
|
||||
<a href="https://github.com/smoketurner/dropwizard-consul">Dropwizard Consul Bundle</a> - Service discovery and configuration integration with the <a href="http://www.dropwizard.io/">Dropwizard</a> framework
|
||||
</li>
|
||||
<li>
|
||||
<a href="https://github.com/progrium/docker-consul">docker-consul</a> - Dockerized Consul Agent
|
||||
</li>
|
||||
|
@ -110,7 +110,7 @@ Armons-MacBook-Air.node.dc1.consul. 0 IN A 172.20.20.11
|
||||
```
|
||||
|
||||
The `SRV` record says that the web service is running on port 80 and exists on
|
||||
the node `agent-one.node.dc1.consul.`. An additional section is returned by the
|
||||
the node `Armons-MacBook-Air.node.dc1.consul.`. An additional section is returned by the
|
||||
DNS with the `A` record for that node.
|
||||
|
||||
Finally, we can also use the DNS API to filter services by tags. The
|
||||
|
@ -8,7 +8,7 @@ description: |-
|
||||
|
||||
# Consul & the HashiCorp Ecosystem
|
||||
|
||||
HashiCorp is the creator of the open source projects Vagrant, Packer, Terraform, Serf, and Consul, and the commercial product Atlas. Terraform is just one piece of the ecosystem HashiCorp has built to make application delivery a versioned, auditable, repeatable, and collaborative process. To learn more about our beliefs on the qualities of the modern datacenter and responsible application delivery, read [The Atlas Mindset: Version Control for Infrastructure](https://www.hashicorp.com/blog/atlas-mindset.html?utm_source=consul&utm_campaign=HashicorpEcosystem).
|
||||
HashiCorp is the creator of the open source projects Vagrant, Packer, Terraform, Serf, and Consul, and the commercial product Atlas. Consul is just one piece of the ecosystem HashiCorp has built to make application delivery a versioned, auditable, repeatable, and collaborative process. To learn more about our beliefs on the qualities of the modern datacenter and responsible application delivery, read [The Atlas Mindset: Version Control for Infrastructure](https://www.hashicorp.com/blog/atlas-mindset.html?utm_source=consul&utm_campaign=HashicorpEcosystem).
|
||||
|
||||
If you are using Consul for service discovery, it’s likely that you have a system to deploy infrastructure which Consul is then connecting. Terraform is our tool for creating, combining, and modifying infrastructure.
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user