mirror of https://github.com/status-im/consul.git
Get latest updates
This commit is contained in:
commit
e4aef3a49a
12
CHANGELOG.md
12
CHANGELOG.md
|
@ -1,5 +1,17 @@
|
|||
## 0.7.0 (UNRELEASED)
|
||||
|
||||
IMPROVEMENTS:
|
||||
|
||||
* Consul agents will now periodically reconnect to available Consul servers
|
||||
in order to redistribute their RPC query load. Consul clients will, by
|
||||
default, attempt to establish a new connection every 120s to 180s unless
|
||||
the size of the cluster is sufficiently large. The rate at which agents
|
||||
begin to query new servers is proportional to the size of the Consul
|
||||
cluster (servers should never receive more than 64 new connections per
|
||||
second per Consul server as a result of rebalancing). Clusters in stable
|
||||
environments who use `allow_stale` should see a more even distribution of
|
||||
query load across all of their Consul servers. [GH-1743]
|
||||
|
||||
## 0.6.4 (March 16, 2016)
|
||||
|
||||
BACKWARDS INCOMPATIBILITIES:
|
||||
|
|
|
@ -28,8 +28,8 @@
|
|||
},
|
||||
{
|
||||
"ImportPath": "github.com/boltdb/bolt",
|
||||
"Comment": "v1.1.0-65-gee4a088",
|
||||
"Rev": "ee4a0888a9abe7eefe5a0992ca4cb06864839873"
|
||||
"Comment": "v1.2.0",
|
||||
"Rev": "c6ba97b89e0454fec9aa92e1d33a4e2c5fc1f631"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/elazarl/go-bindata-assetfs",
|
||||
|
|
12
api/agent.go
12
api/agent.go
|
@ -257,12 +257,12 @@ type checkUpdate struct {
|
|||
// required to use this API).
|
||||
func (a *Agent) UpdateTTL(checkID, output, status string) error {
|
||||
switch status {
|
||||
case "pass", "passing":
|
||||
status = "passing"
|
||||
case "warn", "warning":
|
||||
status = "warning"
|
||||
case "fail", "critical":
|
||||
status = "critical"
|
||||
case "pass", HealthPassing:
|
||||
status = HealthPassing
|
||||
case "warn", HealthWarning:
|
||||
status = HealthWarning
|
||||
case "fail", HealthCritical:
|
||||
status = HealthCritical
|
||||
default:
|
||||
return fmt.Errorf("Invalid status: %s", status)
|
||||
}
|
||||
|
|
83
api/api.go
83
api/api.go
|
@ -3,9 +3,11 @@ package api
|
|||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
|
@ -122,6 +124,30 @@ type Config struct {
|
|||
Token string
|
||||
}
|
||||
|
||||
// TLSConfig is used to generate a TLSClientConfig that's useful for talking to
|
||||
// Consul using TLS.
|
||||
type TLSConfig struct {
|
||||
// Address is the optional address of the Consul server. The port, if any
|
||||
// will be removed from here and this will be set to the ServerName of the
|
||||
// resulting config.
|
||||
Address string
|
||||
|
||||
// CAFile is the optional path to the CA certificate used for Consul
|
||||
// communication, defaults to the system bundle if not specified.
|
||||
CAFile string
|
||||
|
||||
// CertFile is the optional path to the certificate for Consul
|
||||
// communication. If this is set then you need to also set KeyFile.
|
||||
CertFile string
|
||||
|
||||
// KeyFile is the optional path to the private key for Consul communication.
|
||||
// If this is set then you need to also set CertFile.
|
||||
KeyFile string
|
||||
|
||||
// InsecureSkipVerify if set to true will disable TLS host verification.
|
||||
InsecureSkipVerify bool
|
||||
}
|
||||
|
||||
// DefaultConfig returns a default configuration for the client. By default this
|
||||
// will pool and reuse idle connections to Consul. If you have a long-lived
|
||||
// client object, this is the desired behavior and should make the most efficient
|
||||
|
@ -194,10 +220,19 @@ func defaultConfig(transportFn func() *http.Transport) *Config {
|
|||
}
|
||||
|
||||
if !doVerify {
|
||||
transport := transportFn()
|
||||
transport.TLSClientConfig = &tls.Config{
|
||||
tlsClientConfig, err := SetupTLSConfig(&TLSConfig{
|
||||
InsecureSkipVerify: true,
|
||||
})
|
||||
|
||||
// We don't expect this to fail given that we aren't
|
||||
// parsing any of the input, but we panic just in case
|
||||
// since this doesn't have an error return.
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
transport := transportFn()
|
||||
transport.TLSClientConfig = tlsClientConfig
|
||||
config.HttpClient.Transport = transport
|
||||
}
|
||||
}
|
||||
|
@ -205,6 +240,50 @@ func defaultConfig(transportFn func() *http.Transport) *Config {
|
|||
return config
|
||||
}
|
||||
|
||||
// TLSConfig is used to generate a TLSClientConfig that's useful for talking to
|
||||
// Consul using TLS.
|
||||
func SetupTLSConfig(tlsConfig *TLSConfig) (*tls.Config, error) {
|
||||
tlsClientConfig := &tls.Config{
|
||||
InsecureSkipVerify: tlsConfig.InsecureSkipVerify,
|
||||
}
|
||||
|
||||
if tlsConfig.Address != "" {
|
||||
server := tlsConfig.Address
|
||||
hasPort := strings.LastIndex(server, ":") > strings.LastIndex(server, "]")
|
||||
if hasPort {
|
||||
var err error
|
||||
server, _, err = net.SplitHostPort(server)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
tlsClientConfig.ServerName = server
|
||||
}
|
||||
|
||||
if tlsConfig.CertFile != "" && tlsConfig.KeyFile != "" {
|
||||
tlsCert, err := tls.LoadX509KeyPair(tlsConfig.CertFile, tlsConfig.KeyFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tlsClientConfig.Certificates = []tls.Certificate{tlsCert}
|
||||
}
|
||||
|
||||
if tlsConfig.CAFile != "" {
|
||||
data, err := ioutil.ReadFile(tlsConfig.CAFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read CA file: %v", err)
|
||||
}
|
||||
|
||||
caPool := x509.NewCertPool()
|
||||
if !caPool.AppendCertsFromPEM(data) {
|
||||
return nil, fmt.Errorf("failed to parse CA certificate")
|
||||
}
|
||||
tlsClientConfig.RootCAs = caPool
|
||||
}
|
||||
|
||||
return tlsClientConfig, nil
|
||||
}
|
||||
|
||||
// Client provides a client to the Consul API
|
||||
type Client struct {
|
||||
config Config
|
||||
|
|
|
@ -2,11 +2,13 @@ package api
|
|||
|
||||
import (
|
||||
crand "crypto/rand"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -121,6 +123,98 @@ func TestDefaultConfig_env(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestSetupTLSConfig(t *testing.T) {
|
||||
// A default config should result in a clean default client config.
|
||||
tlsConfig := &TLSConfig{}
|
||||
cc, err := SetupTLSConfig(tlsConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
expected := &tls.Config{}
|
||||
if !reflect.DeepEqual(cc, expected) {
|
||||
t.Fatalf("bad: %v", cc)
|
||||
}
|
||||
|
||||
// Try some address variations with and without ports.
|
||||
tlsConfig.Address = "127.0.0.1"
|
||||
cc, err = SetupTLSConfig(tlsConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
expected.ServerName = "127.0.0.1"
|
||||
if !reflect.DeepEqual(cc, expected) {
|
||||
t.Fatalf("bad: %v", cc)
|
||||
}
|
||||
|
||||
tlsConfig.Address = "127.0.0.1:80"
|
||||
cc, err = SetupTLSConfig(tlsConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
expected.ServerName = "127.0.0.1"
|
||||
if !reflect.DeepEqual(cc, expected) {
|
||||
t.Fatalf("bad: %v", cc)
|
||||
}
|
||||
|
||||
tlsConfig.Address = "demo.consul.io:80"
|
||||
cc, err = SetupTLSConfig(tlsConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
expected.ServerName = "demo.consul.io"
|
||||
if !reflect.DeepEqual(cc, expected) {
|
||||
t.Fatalf("bad: %v", cc)
|
||||
}
|
||||
|
||||
tlsConfig.Address = "[2001:db8:a0b:12f0::1]"
|
||||
cc, err = SetupTLSConfig(tlsConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
expected.ServerName = "[2001:db8:a0b:12f0::1]"
|
||||
if !reflect.DeepEqual(cc, expected) {
|
||||
t.Fatalf("bad: %v", cc)
|
||||
}
|
||||
|
||||
tlsConfig.Address = "[2001:db8:a0b:12f0::1]:80"
|
||||
cc, err = SetupTLSConfig(tlsConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
expected.ServerName = "2001:db8:a0b:12f0::1"
|
||||
if !reflect.DeepEqual(cc, expected) {
|
||||
t.Fatalf("bad: %v", cc)
|
||||
}
|
||||
|
||||
// Skip verification.
|
||||
tlsConfig.InsecureSkipVerify = true
|
||||
cc, err = SetupTLSConfig(tlsConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
expected.InsecureSkipVerify = true
|
||||
if !reflect.DeepEqual(cc, expected) {
|
||||
t.Fatalf("bad: %v", cc)
|
||||
}
|
||||
|
||||
// Make a new config that hits all the file parsers.
|
||||
tlsConfig = &TLSConfig{
|
||||
CertFile: "../test/hostname/Alice.crt",
|
||||
KeyFile: "../test/hostname/Alice.key",
|
||||
CAFile: "../test/hostname/CertAuth.crt",
|
||||
}
|
||||
cc, err = SetupTLSConfig(tlsConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(cc.Certificates) != 1 {
|
||||
t.Fatalf("missing certificate: %v", cc.Certificates)
|
||||
}
|
||||
if cc.RootCAs == nil {
|
||||
t.Fatalf("didn't load root CAs")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetQueryOptions(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
|
|
|
@ -4,6 +4,16 @@ import (
|
|||
"fmt"
|
||||
)
|
||||
|
||||
const (
|
||||
// HealthAny is special, and is used as a wild card,
|
||||
// not as a specific state.
|
||||
HealthAny = "any"
|
||||
HealthUnknown = "unknown"
|
||||
HealthPassing = "passing"
|
||||
HealthWarning = "warning"
|
||||
HealthCritical = "critical"
|
||||
)
|
||||
|
||||
// HealthCheck is used to represent a single check
|
||||
type HealthCheck struct {
|
||||
Node string
|
||||
|
@ -85,7 +95,7 @@ func (h *Health) Service(service, tag string, passingOnly bool, q *QueryOptions)
|
|||
r.params.Set("tag", tag)
|
||||
}
|
||||
if passingOnly {
|
||||
r.params.Set("passing", "1")
|
||||
r.params.Set(HealthPassing, "1")
|
||||
}
|
||||
rtt, resp, err := requireOK(h.c.doRequest(r))
|
||||
if err != nil {
|
||||
|
@ -108,11 +118,11 @@ func (h *Health) Service(service, tag string, passingOnly bool, q *QueryOptions)
|
|||
// The wildcard "any" state can also be used for all checks.
|
||||
func (h *Health) State(state string, q *QueryOptions) ([]*HealthCheck, *QueryMeta, error) {
|
||||
switch state {
|
||||
case "any":
|
||||
case "warning":
|
||||
case "critical":
|
||||
case "passing":
|
||||
case "unknown":
|
||||
case HealthAny:
|
||||
case HealthWarning:
|
||||
case HealthCritical:
|
||||
case HealthPassing:
|
||||
case HealthUnknown:
|
||||
default:
|
||||
return nil, nil, fmt.Errorf("Unsupported state: %v", state)
|
||||
}
|
||||
|
|
|
@ -271,7 +271,7 @@ func TestCatalogNodes_DistanceSort(t *testing.T) {
|
|||
if err := srv.agent.RPC("Coordinate.Update", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
// Query again and now foo should have moved to the front of the line.
|
||||
req, err = http.NewRequest("GET", "/v1/catalog/nodes?dc=dc1&near=foo", nil)
|
||||
|
@ -482,7 +482,7 @@ func TestCatalogServiceNodes_DistanceSort(t *testing.T) {
|
|||
if err := srv.agent.RPC("Coordinate.Update", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
// Query again and now foo should have moved to the front of the line.
|
||||
req, err = http.NewRequest("GET", "/v1/catalog/service/api?tag=a&near=foo", nil)
|
||||
|
|
|
@ -360,7 +360,7 @@ type Config struct {
|
|||
// * deny - Deny all requests
|
||||
// * extend-cache - Ignore the cache expiration, and allow cached
|
||||
// ACL's to be used to service requests. This
|
||||
// is the default. If the ACL is not in the cache,
|
||||
// is the default. If the ACL is not in the cache,
|
||||
// this acts like deny.
|
||||
ACLDownPolicy string `mapstructure:"acl_down_policy"`
|
||||
|
||||
|
|
|
@ -99,7 +99,7 @@ func TestCoordinate_Nodes(t *testing.T) {
|
|||
if err := srv.agent.RPC("Coordinate.Update", &arg2, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
// Query back and check the nodes are present and sorted correctly.
|
||||
req, err = http.NewRequest("GET", "/v1/coordinate/nodes?dc=dc1", nil)
|
||||
|
|
|
@ -126,7 +126,7 @@ func TestHealthChecksInState_DistanceSort(t *testing.T) {
|
|||
if err := srv.agent.RPC("Coordinate.Update", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
// Query again and now foo should have moved to the front of the line.
|
||||
resp = httptest.NewRecorder()
|
||||
|
@ -320,7 +320,7 @@ func TestHealthServiceChecks_DistanceSort(t *testing.T) {
|
|||
if err := srv.agent.RPC("Coordinate.Update", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
// Query again and now foo should have moved to the front of the line.
|
||||
resp = httptest.NewRecorder()
|
||||
|
@ -487,7 +487,7 @@ func TestHealthServiceNodes_DistanceSort(t *testing.T) {
|
|||
if err := srv.agent.RPC("Coordinate.Update", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
// Query again and now foo should have moved to the front of the line.
|
||||
resp = httptest.NewRecorder()
|
||||
|
|
|
@ -281,20 +281,13 @@ func TestCatalogListDatacenters_DistanceSort(t *testing.T) {
|
|||
// coordinates, so the best we can do is make sure that the sorting
|
||||
// function is getting called (it's tested extensively in rtt_test.go).
|
||||
// Since this is relative to dc1, it will be listed first (proving we
|
||||
// went into the sort fn) and the other two will be sorted by name since
|
||||
// there are no known coordinates for them.
|
||||
// went into the sort fn).
|
||||
if len(out) != 3 {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if out[0] != "dc1" {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if out[1] != "acdc" {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if out[2] != "dc2" {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCatalogListNodes(t *testing.T) {
|
||||
|
|
115
consul/client.go
115
consul/client.go
|
@ -3,7 +3,6 @@ package consul
|
|||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
@ -11,19 +10,35 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/server_details"
|
||||
"github.com/hashicorp/consul/consul/server_manager"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
const (
|
||||
// clientRPCCache controls how long we keep an idle connection
|
||||
// open to a server
|
||||
clientRPCCache = 30 * time.Second
|
||||
// clientRPCConnMaxIdle controls how long we keep an idle connection
|
||||
// open to a server. 127s was chosen as the first prime above 120s
|
||||
// (arbitrarily chose to use a prime) with the intent of reusing
|
||||
// connections who are used by once-a-minute cron(8) jobs *and* who
|
||||
// use a 60s jitter window (e.g. in vixie cron job execution can
|
||||
// drift by up to 59s per job, or 119s for a once-a-minute cron job).
|
||||
clientRPCConnMaxIdle = 127 * time.Second
|
||||
|
||||
// clientMaxStreams controls how many idle streams we keep
|
||||
// open to a server
|
||||
clientMaxStreams = 32
|
||||
|
||||
// serfEventBacklog is the maximum number of unprocessed Serf Events
|
||||
// that will be held in queue before new serf events block. A
|
||||
// blocking serf event queue is a bad thing.
|
||||
serfEventBacklog = 256
|
||||
|
||||
// serfEventBacklogWarning is the threshold at which point log
|
||||
// warnings will be emitted indicating a problem when processing serf
|
||||
// events.
|
||||
serfEventBacklogWarning = 200
|
||||
)
|
||||
|
||||
// Interface is used to provide either a Client or Server,
|
||||
|
@ -43,19 +58,14 @@ type Client struct {
|
|||
// Connection pool to consul servers
|
||||
connPool *ConnPool
|
||||
|
||||
// consuls tracks the locally known servers
|
||||
consuls []*serverParts
|
||||
consulLock sync.RWMutex
|
||||
// serverMgr is responsible for the selection and maintenance of
|
||||
// Consul servers this agent uses for RPC requests
|
||||
serverMgr *server_manager.ServerManager
|
||||
|
||||
// eventCh is used to receive events from the
|
||||
// serf cluster in the datacenter
|
||||
eventCh chan serf.Event
|
||||
|
||||
// lastServer is the last server we made an RPC call to,
|
||||
// this is used to re-use the last connection
|
||||
lastServer *serverParts
|
||||
lastRPCTime time.Time
|
||||
|
||||
// Logger uses the provided LogOutput
|
||||
logger *log.Logger
|
||||
|
||||
|
@ -103,12 +113,17 @@ func NewClient(config *Config) (*Client, error) {
|
|||
// Create server
|
||||
c := &Client{
|
||||
config: config,
|
||||
connPool: NewPool(config.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap),
|
||||
eventCh: make(chan serf.Event, 256),
|
||||
connPool: NewPool(config.LogOutput, clientRPCConnMaxIdle, clientMaxStreams, tlsWrap),
|
||||
eventCh: make(chan serf.Event, serfEventBacklog),
|
||||
logger: logger,
|
||||
shutdownCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
c.serverMgr = server_manager.New(c.logger, c.shutdownCh, c.serf)
|
||||
|
||||
// Start maintenance task for serverMgr
|
||||
go c.serverMgr.Start()
|
||||
|
||||
// Start the Serf listeners to prevent a deadlock
|
||||
go c.lanEventHandler()
|
||||
|
||||
|
@ -215,7 +230,13 @@ func (c *Client) Encrypted() bool {
|
|||
|
||||
// lanEventHandler is used to handle events from the lan Serf cluster
|
||||
func (c *Client) lanEventHandler() {
|
||||
var numQueuedEvents int
|
||||
for {
|
||||
numQueuedEvents = len(c.eventCh)
|
||||
if numQueuedEvents > serfEventBacklogWarning {
|
||||
c.logger.Printf("[WARN] consul: number of queued serf events above warning threshold: %d/%d", numQueuedEvents, serfEventBacklogWarning)
|
||||
}
|
||||
|
||||
select {
|
||||
case e := <-c.eventCh:
|
||||
switch e.EventType() {
|
||||
|
@ -240,7 +261,7 @@ func (c *Client) lanEventHandler() {
|
|||
// nodeJoin is used to handle join events on the serf cluster
|
||||
func (c *Client) nodeJoin(me serf.MemberEvent) {
|
||||
for _, m := range me.Members {
|
||||
ok, parts := isConsulServer(m)
|
||||
ok, parts := server_details.IsConsulServer(m)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
@ -250,23 +271,7 @@ func (c *Client) nodeJoin(me serf.MemberEvent) {
|
|||
continue
|
||||
}
|
||||
c.logger.Printf("[INFO] consul: adding server %s", parts)
|
||||
|
||||
// Check if this server is known
|
||||
found := false
|
||||
c.consulLock.Lock()
|
||||
for idx, existing := range c.consuls {
|
||||
if existing.Name == parts.Name {
|
||||
c.consuls[idx] = parts
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Add to the list if not known
|
||||
if !found {
|
||||
c.consuls = append(c.consuls, parts)
|
||||
}
|
||||
c.consulLock.Unlock()
|
||||
c.serverMgr.AddServer(parts)
|
||||
|
||||
// Trigger the callback
|
||||
if c.config.ServerUp != nil {
|
||||
|
@ -278,23 +283,12 @@ func (c *Client) nodeJoin(me serf.MemberEvent) {
|
|||
// nodeFail is used to handle fail events on the serf cluster
|
||||
func (c *Client) nodeFail(me serf.MemberEvent) {
|
||||
for _, m := range me.Members {
|
||||
ok, parts := isConsulServer(m)
|
||||
ok, parts := server_details.IsConsulServer(m)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
c.logger.Printf("[INFO] consul: removing server %s", parts)
|
||||
|
||||
// Remove the server if known
|
||||
c.consulLock.Lock()
|
||||
n := len(c.consuls)
|
||||
for i := 0; i < n; i++ {
|
||||
if c.consuls[i].Name == parts.Name {
|
||||
c.consuls[i], c.consuls[n-1] = c.consuls[n-1], nil
|
||||
c.consuls = c.consuls[:n-1]
|
||||
break
|
||||
}
|
||||
}
|
||||
c.consulLock.Unlock()
|
||||
c.serverMgr.RemoveServer(parts)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -328,50 +322,33 @@ func (c *Client) localEvent(event serf.UserEvent) {
|
|||
|
||||
// RPC is used to forward an RPC call to a consul server, or fail if no servers
|
||||
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
|
||||
// Check the last rpc time
|
||||
var server *serverParts
|
||||
if time.Now().Sub(c.lastRPCTime) < clientRPCCache {
|
||||
server = c.lastServer
|
||||
if server != nil {
|
||||
goto TRY_RPC
|
||||
}
|
||||
}
|
||||
|
||||
// Bail if we can't find any servers
|
||||
c.consulLock.RLock()
|
||||
if len(c.consuls) == 0 {
|
||||
c.consulLock.RUnlock()
|
||||
server := c.serverMgr.FindServer()
|
||||
if server == nil {
|
||||
return structs.ErrNoServers
|
||||
}
|
||||
|
||||
// Select a random addr
|
||||
server = c.consuls[rand.Int31()%int32(len(c.consuls))]
|
||||
c.consulLock.RUnlock()
|
||||
|
||||
// Forward to remote Consul
|
||||
TRY_RPC:
|
||||
if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil {
|
||||
c.lastServer = nil
|
||||
c.lastRPCTime = time.Time{}
|
||||
c.serverMgr.NotifyFailedServer(server)
|
||||
c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Cache the last server
|
||||
c.lastServer = server
|
||||
c.lastRPCTime = time.Now()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stats is used to return statistics for debugging and insight
|
||||
// for various sub-systems
|
||||
func (c *Client) Stats() map[string]map[string]string {
|
||||
numServers := c.serverMgr.NumServers()
|
||||
|
||||
toString := func(v uint64) string {
|
||||
return strconv.FormatUint(v, 10)
|
||||
}
|
||||
stats := map[string]map[string]string{
|
||||
"consul": map[string]string{
|
||||
"server": "false",
|
||||
"known_servers": toString(uint64(len(c.consuls))),
|
||||
"known_servers": toString(uint64(numServers)),
|
||||
},
|
||||
"serf_lan": c.serf.Stats(),
|
||||
"runtime": runtimeStats(),
|
||||
|
|
|
@ -83,6 +83,11 @@ func TestClient_JoinLAN(t *testing.T) {
|
|||
if _, err := c1.JoinLAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
return c1.serverMgr.NumServers() == 1, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("expected consul server")
|
||||
})
|
||||
|
||||
// Check the members
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
|
@ -95,7 +100,7 @@ func TestClient_JoinLAN(t *testing.T) {
|
|||
|
||||
// Check we have a new consul
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
return len(c1.consuls) == 1, nil
|
||||
return c1.serverMgr.NumServers() == 1, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("expected consul server")
|
||||
})
|
||||
|
|
|
@ -118,7 +118,7 @@ func TestCoordinate_Update(t *testing.T) {
|
|||
}
|
||||
|
||||
// Wait a while and the updates should get picked up.
|
||||
time.Sleep(2 * s1.config.CoordinateUpdatePeriod)
|
||||
time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
|
||||
c, err = state.CoordinateGetRaw("node1")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
@ -163,7 +163,7 @@ func TestCoordinate_Update(t *testing.T) {
|
|||
|
||||
// Wait a little while for the batch routine to run, then make sure
|
||||
// exactly one of the updates got dropped (we won't know which one).
|
||||
time.Sleep(2 * s1.config.CoordinateUpdatePeriod)
|
||||
time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
|
||||
numDropped := 0
|
||||
for i := 0; i < spamLen; i++ {
|
||||
c, err = state.CoordinateGetRaw(fmt.Sprintf("bogusnode%d", i))
|
||||
|
@ -269,7 +269,7 @@ func TestCoordinate_ListNodes(t *testing.T) {
|
|||
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg3, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
time.Sleep(2 * s1.config.CoordinateUpdatePeriod)
|
||||
time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
|
||||
|
||||
// Now query back for all the nodes.
|
||||
arg := structs.DCSpecificRequest{
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/consul/server_details"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
|
@ -349,7 +350,7 @@ func (s *Server) shouldHandleMember(member serf.Member) bool {
|
|||
if valid, dc := isConsulNode(member); valid && dc == s.config.Datacenter {
|
||||
return true
|
||||
}
|
||||
if valid, parts := isConsulServer(member); valid && parts.Datacenter == s.config.Datacenter {
|
||||
if valid, parts := server_details.IsConsulServer(member); valid && parts.Datacenter == s.config.Datacenter {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
@ -360,7 +361,7 @@ func (s *Server) shouldHandleMember(member serf.Member) bool {
|
|||
func (s *Server) handleAliveMember(member serf.Member) error {
|
||||
// Register consul service if a server
|
||||
var service *structs.NodeService
|
||||
if valid, parts := isConsulServer(member); valid {
|
||||
if valid, parts := server_details.IsConsulServer(member); valid {
|
||||
service = &structs.NodeService{
|
||||
ID: ConsulServiceID,
|
||||
Service: ConsulServiceName,
|
||||
|
@ -496,7 +497,7 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error
|
|||
}
|
||||
|
||||
// Remove from Raft peers if this was a server
|
||||
if valid, parts := isConsulServer(member); valid {
|
||||
if valid, parts := server_details.IsConsulServer(member); valid {
|
||||
if err := s.removeConsulServer(member, parts.Port); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -523,7 +524,7 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error
|
|||
}
|
||||
|
||||
// joinConsulServer is used to try to join another consul server
|
||||
func (s *Server) joinConsulServer(m serf.Member, parts *serverParts) error {
|
||||
func (s *Server) joinConsulServer(m serf.Member, parts *server_details.ServerDetails) error {
|
||||
// Do not join ourself
|
||||
if m.Name == s.config.NodeName {
|
||||
return nil
|
||||
|
@ -533,7 +534,7 @@ func (s *Server) joinConsulServer(m serf.Member, parts *serverParts) error {
|
|||
if parts.Bootstrap {
|
||||
members := s.serfLAN.Members()
|
||||
for _, member := range members {
|
||||
valid, p := isConsulServer(member)
|
||||
valid, p := server_details.IsConsulServer(member)
|
||||
if valid && member.Name != m.Name && p.Bootstrap {
|
||||
s.logger.Printf("[ERR] consul: '%v' and '%v' are both in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer.", m.Name, member.Name)
|
||||
return nil
|
||||
|
|
|
@ -3,6 +3,7 @@ package consul
|
|||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/consul/server_details"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
|
@ -24,7 +25,7 @@ func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error {
|
|||
continue
|
||||
}
|
||||
|
||||
ok, parts := isConsulServer(*m)
|
||||
ok, parts := server_details.IsConsulServer(*m)
|
||||
if ok && parts.Datacenter != md.dc {
|
||||
return fmt.Errorf("Member '%s' part of wrong datacenter '%s'",
|
||||
m.Name, parts.Datacenter)
|
||||
|
@ -41,7 +42,7 @@ type wanMergeDelegate struct {
|
|||
|
||||
func (md *wanMergeDelegate) NotifyMerge(members []*serf.Member) error {
|
||||
for _, m := range members {
|
||||
ok, _ := isConsulServer(*m)
|
||||
ok, _ := server_details.IsConsulServer(*m)
|
||||
if !ok {
|
||||
return fmt.Errorf("Member '%s' is not a server", m.Name)
|
||||
}
|
||||
|
|
|
@ -1537,7 +1537,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||
if err := msgpackrpc.CallWithCodec(codec1, "Coordinate.Update", &req, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
time.Sleep(2 * s1.config.CoordinateUpdatePeriod)
|
||||
time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
|
||||
}
|
||||
|
||||
// Try an RTT sort. We don't have any other coordinates in there but
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"net"
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/consul/consul/server_details"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
|
@ -140,7 +141,7 @@ func (s *Server) localEvent(event serf.UserEvent) {
|
|||
// lanNodeJoin is used to handle join events on the LAN pool.
|
||||
func (s *Server) lanNodeJoin(me serf.MemberEvent) {
|
||||
for _, m := range me.Members {
|
||||
ok, parts := isConsulServer(m)
|
||||
ok, parts := server_details.IsConsulServer(m)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
@ -163,7 +164,7 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) {
|
|||
// wanNodeJoin is used to handle join events on the WAN pool.
|
||||
func (s *Server) wanNodeJoin(me serf.MemberEvent) {
|
||||
for _, m := range me.Members {
|
||||
ok, parts := isConsulServer(m)
|
||||
ok, parts := server_details.IsConsulServer(m)
|
||||
if !ok {
|
||||
s.logger.Printf("[WARN] consul: non-server in WAN pool: %s", m.Name)
|
||||
continue
|
||||
|
@ -209,7 +210,7 @@ func (s *Server) maybeBootstrap() {
|
|||
members := s.serfLAN.Members()
|
||||
addrs := make([]string, 0)
|
||||
for _, member := range members {
|
||||
valid, p := isConsulServer(member)
|
||||
valid, p := server_details.IsConsulServer(member)
|
||||
if !valid {
|
||||
continue
|
||||
}
|
||||
|
@ -247,7 +248,7 @@ func (s *Server) maybeBootstrap() {
|
|||
// lanNodeFailed is used to handle fail events on the LAN pool.
|
||||
func (s *Server) lanNodeFailed(me serf.MemberEvent) {
|
||||
for _, m := range me.Members {
|
||||
ok, parts := isConsulServer(m)
|
||||
ok, parts := server_details.IsConsulServer(m)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
@ -262,7 +263,7 @@ func (s *Server) lanNodeFailed(me serf.MemberEvent) {
|
|||
// wanNodeFailed is used to handle fail events on the WAN pool.
|
||||
func (s *Server) wanNodeFailed(me serf.MemberEvent) {
|
||||
for _, m := range me.Members {
|
||||
ok, parts := isConsulServer(m)
|
||||
ok, parts := server_details.IsConsulServer(m)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/consul/server_details"
|
||||
"github.com/hashicorp/consul/consul/state"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/raft"
|
||||
|
@ -97,7 +98,7 @@ type Server struct {
|
|||
|
||||
// localConsuls is used to track the known consuls
|
||||
// in the local datacenter. Used to do leader forwarding.
|
||||
localConsuls map[string]*serverParts
|
||||
localConsuls map[string]*server_details.ServerDetails
|
||||
localLock sync.RWMutex
|
||||
|
||||
// Logger uses the provided LogOutput
|
||||
|
@ -119,7 +120,7 @@ type Server struct {
|
|||
|
||||
// remoteConsuls is used to track the known consuls in
|
||||
// remote datacenters. Used to do DC forwarding.
|
||||
remoteConsuls map[string][]*serverParts
|
||||
remoteConsuls map[string][]*server_details.ServerDetails
|
||||
remoteLock sync.RWMutex
|
||||
|
||||
// rpcListener is used to listen for incoming connections
|
||||
|
@ -216,10 +217,10 @@ func NewServer(config *Config) (*Server, error) {
|
|||
connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
|
||||
eventChLAN: make(chan serf.Event, 256),
|
||||
eventChWAN: make(chan serf.Event, 256),
|
||||
localConsuls: make(map[string]*serverParts),
|
||||
localConsuls: make(map[string]*server_details.ServerDetails),
|
||||
logger: logger,
|
||||
reconcileCh: make(chan serf.Member, 32),
|
||||
remoteConsuls: make(map[string][]*serverParts),
|
||||
remoteConsuls: make(map[string][]*server_details.ServerDetails),
|
||||
rpcServer: rpc.NewServer(),
|
||||
rpcTLS: incomingTLS,
|
||||
tombstoneGC: gc,
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
package server_details
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
// ServerDetails is used to return details of a consul server
|
||||
type ServerDetails struct {
|
||||
Name string
|
||||
Datacenter string
|
||||
Port int
|
||||
Bootstrap bool
|
||||
Expect int
|
||||
Version int
|
||||
Addr net.Addr
|
||||
}
|
||||
|
||||
func (s *ServerDetails) String() string {
|
||||
return fmt.Sprintf("%s (Addr: %s) (DC: %s)", s.Name, s.Addr, s.Datacenter)
|
||||
}
|
||||
|
||||
// IsConsulServer returns true if a serf member is a consul server. Returns a
|
||||
// bool and a pointer to the ServerDetails.
|
||||
func IsConsulServer(m serf.Member) (bool, *ServerDetails) {
|
||||
if m.Tags["role"] != "consul" {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
datacenter := m.Tags["dc"]
|
||||
_, bootstrap := m.Tags["bootstrap"]
|
||||
|
||||
expect := 0
|
||||
expect_str, ok := m.Tags["expect"]
|
||||
var err error
|
||||
if ok {
|
||||
expect, err = strconv.Atoi(expect_str)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
port_str := m.Tags["port"]
|
||||
port, err := strconv.Atoi(port_str)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
vsn_str := m.Tags["vsn"]
|
||||
vsn, err := strconv.Atoi(vsn_str)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
addr := &net.TCPAddr{IP: m.Addr, Port: port}
|
||||
|
||||
parts := &ServerDetails{
|
||||
Name: m.Name,
|
||||
Datacenter: datacenter,
|
||||
Port: port,
|
||||
Bootstrap: bootstrap,
|
||||
Expect: expect,
|
||||
Addr: addr,
|
||||
Version: vsn,
|
||||
}
|
||||
return true, parts
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
package server_details_test
|
||||
|
||||
import (
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/consul/server_details"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
func TestIsConsulServer(t *testing.T) {
|
||||
m := serf.Member{
|
||||
Name: "foo",
|
||||
Addr: net.IP([]byte{127, 0, 0, 1}),
|
||||
Tags: map[string]string{
|
||||
"role": "consul",
|
||||
"dc": "east-aws",
|
||||
"port": "10000",
|
||||
"vsn": "1",
|
||||
},
|
||||
}
|
||||
ok, parts := server_details.IsConsulServer(m)
|
||||
if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 {
|
||||
t.Fatalf("bad: %v %v", ok, parts)
|
||||
}
|
||||
if parts.Name != "foo" {
|
||||
t.Fatalf("bad: %v", parts)
|
||||
}
|
||||
if parts.Bootstrap {
|
||||
t.Fatalf("unexpected bootstrap")
|
||||
}
|
||||
if parts.Expect != 0 {
|
||||
t.Fatalf("bad: %v", parts.Expect)
|
||||
}
|
||||
m.Tags["bootstrap"] = "1"
|
||||
m.Tags["disabled"] = "1"
|
||||
ok, parts = server_details.IsConsulServer(m)
|
||||
if !ok {
|
||||
t.Fatalf("expected a valid consul server")
|
||||
}
|
||||
if !parts.Bootstrap {
|
||||
t.Fatalf("expected bootstrap")
|
||||
}
|
||||
if parts.Addr.String() != "127.0.0.1:10000" {
|
||||
t.Fatalf("bad addr: %v", parts.Addr)
|
||||
}
|
||||
if parts.Version != 1 {
|
||||
t.Fatalf("bad: %v", parts)
|
||||
}
|
||||
m.Tags["expect"] = "3"
|
||||
delete(m.Tags, "bootstrap")
|
||||
delete(m.Tags, "disabled")
|
||||
ok, parts = server_details.IsConsulServer(m)
|
||||
if !ok || parts.Expect != 3 {
|
||||
t.Fatalf("bad: %v", parts.Expect)
|
||||
}
|
||||
if parts.Bootstrap {
|
||||
t.Fatalf("unexpected bootstrap")
|
||||
}
|
||||
|
||||
delete(m.Tags, "role")
|
||||
ok, parts = server_details.IsConsulServer(m)
|
||||
if ok {
|
||||
t.Fatalf("unexpected ok server")
|
||||
}
|
||||
}
|
|
@ -0,0 +1,311 @@
|
|||
package server_manager
|
||||
|
||||
import (
|
||||
"log"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/server_details"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
)
|
||||
|
||||
type consulServerEventTypes int
|
||||
|
||||
const (
|
||||
// clientRPCJitterFraction determines the amount of jitter added to
|
||||
// clientRPCMinReuseDuration before a connection is expired and a new
|
||||
// connection is established in order to rebalance load across consul
|
||||
// servers. The cluster-wide number of connections per second from
|
||||
// rebalancing is applied after this jitter to ensure the CPU impact
|
||||
// is always finite. See newRebalanceConnsPerSecPerServer's comment
|
||||
// for additional commentary.
|
||||
//
|
||||
// For example, in a 10K consul cluster with 5x servers, this default
|
||||
// averages out to ~13 new connections from rebalancing per server
|
||||
// per second (each connection is reused for 120s to 180s).
|
||||
clientRPCJitterFraction = 2
|
||||
|
||||
// clientRPCMinReuseDuration controls the minimum amount of time RPC
|
||||
// queries are sent over an established connection to a single server
|
||||
clientRPCMinReuseDuration = 120 * time.Second
|
||||
|
||||
// Limit the number of new connections a server receives per second
|
||||
// for connection rebalancing. This limit caps the load caused by
|
||||
// continual rebalancing efforts when a cluster is in equilibrium. A
|
||||
// lower value comes at the cost of increased recovery time after a
|
||||
// partition. This parameter begins to take effect when there are
|
||||
// more than ~48K clients querying 5x servers or at lower server
|
||||
// values when there is a partition.
|
||||
//
|
||||
// For example, in a 100K consul cluster with 5x servers, it will
|
||||
// take ~5min for all servers to rebalance their connections. If
|
||||
// 99,995 agents are in the minority talking to only one server, it
|
||||
// will take ~26min for all servers to rebalance. A 10K cluster in
|
||||
// the same scenario will take ~2.6min to rebalance.
|
||||
newRebalanceConnsPerSecPerServer = 64
|
||||
)
|
||||
|
||||
type ConsulClusterInfo interface {
|
||||
NumNodes() int
|
||||
}
|
||||
|
||||
// serverCfg is the thread-safe configuration struct used to maintain the
|
||||
// list of Consul servers in ServerManager.
|
||||
//
|
||||
// NOTE(sean@): We are explicitly relying on the fact that serverConfig will
|
||||
// be copied onto the stack. Please keep this structure light.
|
||||
type serverConfig struct {
|
||||
// servers tracks the locally known servers. List membership is
|
||||
// maintained by Serf.
|
||||
servers []*server_details.ServerDetails
|
||||
}
|
||||
|
||||
type ServerManager struct {
|
||||
// serverConfig provides the necessary load/store semantics for the
|
||||
// server list.
|
||||
serverConfigValue atomic.Value
|
||||
serverConfigLock sync.Mutex
|
||||
|
||||
// shutdownCh is a copy of the channel in consul.Client
|
||||
shutdownCh chan struct{}
|
||||
|
||||
logger *log.Logger
|
||||
|
||||
// clusterInfo is used to estimate the approximate number of nodes in
|
||||
// a cluster and limit the rate at which it rebalances server
|
||||
// connections. ConsulClusterInfo is an interface that wraps serf.
|
||||
clusterInfo ConsulClusterInfo
|
||||
|
||||
// notifyFailedServersBarrier is acts as a barrier to prevent
|
||||
// queueing behind serverConfigLog and acts as a TryLock().
|
||||
notifyFailedBarrier int32
|
||||
}
|
||||
|
||||
// AddServer takes out an internal write lock and adds a new server. If the
|
||||
// server is not known, appends the server to the list. The new server will
|
||||
// begin seeing use after the rebalance timer fires or enough servers fail
|
||||
// organically. If the server is already known, merge the new server
|
||||
// details.
|
||||
func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
|
||||
sm.serverConfigLock.Lock()
|
||||
defer sm.serverConfigLock.Unlock()
|
||||
serverCfg := sm.getServerConfig()
|
||||
|
||||
// Check if this server is known
|
||||
found := false
|
||||
for idx, existing := range serverCfg.servers {
|
||||
if existing.Name == server.Name {
|
||||
newServers := make([]*server_details.ServerDetails, len(serverCfg.servers))
|
||||
copy(newServers, serverCfg.servers)
|
||||
|
||||
// Overwrite the existing server details in order to
|
||||
// possibly update metadata (e.g. server version)
|
||||
newServers[idx] = server
|
||||
|
||||
serverCfg.servers = newServers
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Add to the list if not known
|
||||
if !found {
|
||||
newServers := make([]*server_details.ServerDetails, len(serverCfg.servers), len(serverCfg.servers)+1)
|
||||
copy(newServers, serverCfg.servers)
|
||||
newServers = append(newServers, server)
|
||||
serverCfg.servers = newServers
|
||||
}
|
||||
|
||||
sm.saveServerConfig(serverCfg)
|
||||
}
|
||||
|
||||
// cycleServers returns a new list of servers that has dequeued the first
|
||||
// server and enqueued it at the end of the list. cycleServers assumes the
|
||||
// caller is holding the serverConfigLock.
|
||||
func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) {
|
||||
numServers := len(sc.servers)
|
||||
if numServers < 2 {
|
||||
return servers // No action required
|
||||
}
|
||||
|
||||
newServers := make([]*server_details.ServerDetails, 0, numServers)
|
||||
newServers = append(newServers, sc.servers[1:]...)
|
||||
newServers = append(newServers, sc.servers[0])
|
||||
return newServers
|
||||
}
|
||||
|
||||
// FindServer takes out an internal "read lock" and searches through the list
|
||||
// of servers to find a "healthy" server. If the server is actually
|
||||
// unhealthy, we rely on Serf to detect this and remove the node from the
|
||||
// server list. If the server at the front of the list has failed or fails
|
||||
// during an RPC call, it is rotated to the end of the list. If there are no
|
||||
// servers available, return nil.
|
||||
func (sm *ServerManager) FindServer() *server_details.ServerDetails {
|
||||
serverCfg := sm.getServerConfig()
|
||||
numServers := len(serverCfg.servers)
|
||||
if numServers == 0 {
|
||||
sm.logger.Printf("[WARN] consul: No servers available")
|
||||
return nil
|
||||
} else {
|
||||
// Return whatever is at the front of the list because it is
|
||||
// assumed to be the oldest in the server list (unless -
|
||||
// hypothetically - the server list was rotated right after a
|
||||
// server was added).
|
||||
return serverCfg.servers[0]
|
||||
}
|
||||
}
|
||||
|
||||
// getServerConfig is a convenience method which hides the locking semantics
|
||||
// of atomic.Value from the caller.
|
||||
func (sm *ServerManager) getServerConfig() serverConfig {
|
||||
return sm.serverConfigValue.Load().(serverConfig)
|
||||
}
|
||||
|
||||
// saveServerConfig is a convenience method which hides the locking semantics
|
||||
// of atomic.Value from the caller.
|
||||
func (sm *ServerManager) saveServerConfig(sc serverConfig) {
|
||||
sm.serverConfigValue.Store(sc)
|
||||
}
|
||||
|
||||
// New is the only way to safely create a new ServerManager struct.
|
||||
func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo) (sm *ServerManager) {
|
||||
// NOTE(sean@): Can't pass *consul.Client due to an import cycle
|
||||
sm = new(ServerManager)
|
||||
sm.logger = logger
|
||||
sm.clusterInfo = clusterInfo
|
||||
sm.shutdownCh = shutdownCh
|
||||
|
||||
sc := serverConfig{}
|
||||
sc.servers = make([]*server_details.ServerDetails, 0)
|
||||
sm.saveServerConfig(sc)
|
||||
return sm
|
||||
}
|
||||
|
||||
// NotifyFailedServer marks the passed in server as "failed" by rotating it
|
||||
// to the end of the server list.
|
||||
func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) {
|
||||
serverCfg := sm.getServerConfig()
|
||||
|
||||
// If the server being failed is not the first server on the list,
|
||||
// this is a noop. If, however, the server is failed and first on
|
||||
// the list, acquire the lock, retest, and take the penalty of moving
|
||||
// the server to the end of the list.
|
||||
|
||||
// Only rotate the server list when there is more than one server
|
||||
if len(serverCfg.servers) > 1 && serverCfg.servers[0] == server &&
|
||||
// Use atomic.CAS to emulate a TryLock().
|
||||
atomic.CompareAndSwapInt32(&sm.notifyFailedBarrier, 0, 1) {
|
||||
defer atomic.StoreInt32(&sm.notifyFailedBarrier, 0)
|
||||
|
||||
// Grab a lock, retest, and take the hit of cycling the first
|
||||
// server to the end.
|
||||
sm.serverConfigLock.Lock()
|
||||
defer sm.serverConfigLock.Unlock()
|
||||
serverCfg = sm.getServerConfig()
|
||||
|
||||
if len(serverCfg.servers) > 1 && serverCfg.servers[0] == server {
|
||||
serverCfg.servers = serverCfg.cycleServer()
|
||||
sm.saveServerConfig(serverCfg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NumServers takes out an internal "read lock" and returns the number of
|
||||
// servers. numServers includes both healthy and unhealthy servers.
|
||||
func (sm *ServerManager) NumServers() (numServers int) {
|
||||
serverCfg := sm.getServerConfig()
|
||||
numServers = len(serverCfg.servers)
|
||||
return numServers
|
||||
}
|
||||
|
||||
// RebalanceServers takes out an internal write lock and shuffles the list of
|
||||
// servers on this agent. This allows for a redistribution of work across
|
||||
// consul servers and provides a guarantee that the order of the server list
|
||||
// isn't related to the age at which the node was added to the cluster.
|
||||
// Elsewhere we rely on the position in the server list as a hint regarding
|
||||
// the stability of a server relative to its position in the server list.
|
||||
// Servers at or near the front of the list are more stable than servers near
|
||||
// the end of the list. Unhealthy servers are removed when serf notices the
|
||||
// server has been deregistered.
|
||||
func (sm *ServerManager) RebalanceServers() {
|
||||
sm.serverConfigLock.Lock()
|
||||
defer sm.serverConfigLock.Unlock()
|
||||
serverCfg := sm.getServerConfig()
|
||||
|
||||
newServers := make([]*server_details.ServerDetails, len(serverCfg.servers))
|
||||
copy(newServers, serverCfg.servers)
|
||||
|
||||
// Shuffle the server list
|
||||
for i := len(serverCfg.servers) - 1; i > 0; i-- {
|
||||
j := rand.Int31n(int32(i + 1))
|
||||
newServers[i], newServers[j] = newServers[j], newServers[i]
|
||||
}
|
||||
serverCfg.servers = newServers
|
||||
|
||||
sm.saveServerConfig(serverCfg)
|
||||
}
|
||||
|
||||
// RemoveServer takes out an internal write lock and removes a server from
|
||||
// the server list.
|
||||
func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
|
||||
sm.serverConfigLock.Lock()
|
||||
defer sm.serverConfigLock.Unlock()
|
||||
serverCfg := sm.getServerConfig()
|
||||
|
||||
// Remove the server if known
|
||||
for i, _ := range serverCfg.servers {
|
||||
if serverCfg.servers[i].Name == server.Name {
|
||||
newServers := make([]*server_details.ServerDetails, 0, len(serverCfg.servers)-1)
|
||||
newServers = append(newServers, serverCfg.servers[:i]...)
|
||||
newServers = append(newServers, serverCfg.servers[i+1:]...)
|
||||
serverCfg.servers = newServers
|
||||
|
||||
sm.saveServerConfig(serverCfg)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// refreshServerRebalanceTimer is only called once the rebalanceTimer
|
||||
// expires. Historically this was an expensive routine and is intended to be
|
||||
// run in isolation in a dedicated, non-concurrent task.
|
||||
func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) time.Duration {
|
||||
serverCfg := sm.getServerConfig()
|
||||
numConsulServers := len(serverCfg.servers)
|
||||
// Limit this connection's life based on the size (and health) of the
|
||||
// cluster. Never rebalance a connection more frequently than
|
||||
// connReuseLowWatermarkDuration, and make sure we never exceed
|
||||
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
|
||||
clusterWideRebalanceConnsPerSec := float64(numConsulServers * newRebalanceConnsPerSecPerServer)
|
||||
connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction)
|
||||
numLANMembers := sm.clusterInfo.NumNodes()
|
||||
connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers)
|
||||
|
||||
timer.Reset(connRebalanceTimeout)
|
||||
return connRebalanceTimeout
|
||||
}
|
||||
|
||||
// Start is used to start and manage the task of automatically shuffling and
|
||||
// rebalancing the list of consul servers. This maintenance only happens
|
||||
// periodically based on the expiration of the timer. Failed servers are
|
||||
// automatically cycled to the end of the list. New servers are appended to
|
||||
// the list. The order of the server list must be shuffled periodically to
|
||||
// distribute load across all known and available consul servers.
|
||||
func (sm *ServerManager) Start() {
|
||||
var rebalanceTimer *time.Timer = time.NewTimer(clientRPCMinReuseDuration)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-rebalanceTimer.C:
|
||||
sm.logger.Printf("[INFO] server manager: Rebalancing server connections")
|
||||
sm.RebalanceServers()
|
||||
sm.refreshServerRebalanceTimer(rebalanceTimer)
|
||||
|
||||
case <-sm.shutdownCh:
|
||||
sm.logger.Printf("[INFO] server manager: shutting down")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,225 @@
|
|||
package server_manager
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/server_details"
|
||||
)
|
||||
|
||||
var (
|
||||
localLogger *log.Logger
|
||||
localLogBuffer *bytes.Buffer
|
||||
)
|
||||
|
||||
func init() {
|
||||
localLogBuffer = new(bytes.Buffer)
|
||||
localLogger = log.New(localLogBuffer, "", 0)
|
||||
}
|
||||
|
||||
func GetBufferedLogger() *log.Logger {
|
||||
return localLogger
|
||||
}
|
||||
|
||||
type fauxSerf struct {
|
||||
numNodes int
|
||||
}
|
||||
|
||||
func (s *fauxSerf) NumNodes() int {
|
||||
return s.numNodes
|
||||
}
|
||||
|
||||
func testServerManager() (sm *ServerManager) {
|
||||
logger := GetBufferedLogger()
|
||||
shutdownCh := make(chan struct{})
|
||||
sm = New(logger, shutdownCh, &fauxSerf{numNodes: 16384})
|
||||
return sm
|
||||
}
|
||||
|
||||
// func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) {
|
||||
func TestServerManagerInternal_cycleServer(t *testing.T) {
|
||||
sm := testServerManager()
|
||||
sc := sm.getServerConfig()
|
||||
|
||||
server0 := &server_details.ServerDetails{Name: "server1"}
|
||||
server1 := &server_details.ServerDetails{Name: "server2"}
|
||||
server2 := &server_details.ServerDetails{Name: "server3"}
|
||||
sc.servers = append(sc.servers, server0, server1, server2)
|
||||
sm.saveServerConfig(sc)
|
||||
|
||||
sc = sm.getServerConfig()
|
||||
if len(sc.servers) != 3 {
|
||||
t.Fatalf("server length incorrect: %d/3", len(sc.servers))
|
||||
}
|
||||
if sc.servers[0] != server0 &&
|
||||
sc.servers[1] != server1 &&
|
||||
sc.servers[2] != server2 {
|
||||
t.Fatalf("initial server ordering not correct")
|
||||
}
|
||||
|
||||
sc.servers = sc.cycleServer()
|
||||
if len(sc.servers) != 3 {
|
||||
t.Fatalf("server length incorrect: %d/3", len(sc.servers))
|
||||
}
|
||||
if sc.servers[0] != server1 &&
|
||||
sc.servers[1] != server2 &&
|
||||
sc.servers[2] != server0 {
|
||||
t.Fatalf("server ordering after one cycle not correct")
|
||||
}
|
||||
|
||||
sc.servers = sc.cycleServer()
|
||||
if len(sc.servers) != 3 {
|
||||
t.Fatalf("server length incorrect: %d/3", len(sc.servers))
|
||||
}
|
||||
if sc.servers[0] != server2 &&
|
||||
sc.servers[1] != server0 &&
|
||||
sc.servers[2] != server1 {
|
||||
t.Fatalf("server ordering after two cycles not correct")
|
||||
}
|
||||
|
||||
sc.servers = sc.cycleServer()
|
||||
if len(sc.servers) != 3 {
|
||||
t.Fatalf("server length incorrect: %d/3", len(sc.servers))
|
||||
}
|
||||
if sc.servers[0] != server0 &&
|
||||
sc.servers[1] != server1 &&
|
||||
sc.servers[2] != server2 {
|
||||
t.Fatalf("server ordering after three cycles not correct")
|
||||
}
|
||||
}
|
||||
|
||||
// func (sm *ServerManager) getServerConfig() serverConfig {
|
||||
func TestServerManagerInternal_getServerConfig(t *testing.T) {
|
||||
sm := testServerManager()
|
||||
sc := sm.getServerConfig()
|
||||
if sc.servers == nil {
|
||||
t.Fatalf("serverConfig.servers nil")
|
||||
}
|
||||
|
||||
if len(sc.servers) != 0 {
|
||||
t.Fatalf("serverConfig.servers length not zero")
|
||||
}
|
||||
}
|
||||
|
||||
// func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo) (sm *ServerManager) {
|
||||
func TestServerManagerInternal_New(t *testing.T) {
|
||||
sm := testServerManager()
|
||||
if sm == nil {
|
||||
t.Fatalf("ServerManager nil")
|
||||
}
|
||||
|
||||
if sm.clusterInfo == nil {
|
||||
t.Fatalf("ServerManager.clusterInfo nil")
|
||||
}
|
||||
|
||||
if sm.logger == nil {
|
||||
t.Fatalf("ServerManager.logger nil")
|
||||
}
|
||||
|
||||
if sm.shutdownCh == nil {
|
||||
t.Fatalf("ServerManager.shutdownCh nil")
|
||||
}
|
||||
}
|
||||
|
||||
// func (sc *serverConfig) refreshServerRebalanceTimer(timer *time.Timer) {
|
||||
func TestServerManagerInternal_refreshServerRebalanceTimer(t *testing.T) {
|
||||
sm := testServerManager()
|
||||
|
||||
timer := time.NewTimer(time.Duration(1 * time.Nanosecond))
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
sm.refreshServerRebalanceTimer(timer)
|
||||
|
||||
logger := log.New(os.Stderr, "", log.LstdFlags)
|
||||
shutdownCh := make(chan struct{})
|
||||
|
||||
type clusterSizes struct {
|
||||
numNodes int
|
||||
numServers int
|
||||
minRebalance time.Duration
|
||||
}
|
||||
clusters := []clusterSizes{
|
||||
{0, 3, 2 * time.Minute},
|
||||
{1, 0, 2 * time.Minute}, // partitioned cluster
|
||||
{1, 3, 2 * time.Minute},
|
||||
{2, 3, 2 * time.Minute},
|
||||
{100, 0, 2 * time.Minute}, // partitioned
|
||||
{100, 1, 2 * time.Minute}, // partitioned
|
||||
{100, 3, 2 * time.Minute},
|
||||
{1024, 1, 2 * time.Minute}, // partitioned
|
||||
{1024, 3, 2 * time.Minute}, // partitioned
|
||||
{1024, 5, 2 * time.Minute},
|
||||
{16384, 1, 4 * time.Minute}, // partitioned
|
||||
{16384, 2, 2 * time.Minute}, // partitioned
|
||||
{16384, 3, 2 * time.Minute}, // partitioned
|
||||
{16384, 5, 2 * time.Minute},
|
||||
{65535, 0, 2 * time.Minute}, // partitioned
|
||||
{65535, 1, 8 * time.Minute}, // partitioned
|
||||
{65535, 2, 3 * time.Minute}, // partitioned
|
||||
{65535, 3, 5 * time.Minute}, // partitioned
|
||||
{65535, 5, 3 * time.Minute}, // partitioned
|
||||
{65535, 7, 2 * time.Minute},
|
||||
{1000000, 1, 4 * time.Hour}, // partitioned
|
||||
{1000000, 2, 2 * time.Hour}, // partitioned
|
||||
{1000000, 3, 80 * time.Minute}, // partitioned
|
||||
{1000000, 5, 50 * time.Minute}, // partitioned
|
||||
{1000000, 11, 20 * time.Minute}, // partitioned
|
||||
{1000000, 19, 10 * time.Minute},
|
||||
}
|
||||
|
||||
for _, s := range clusters {
|
||||
sm := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes})
|
||||
|
||||
for i := 0; i < s.numServers; i++ {
|
||||
nodeName := fmt.Sprintf("s%02d", i)
|
||||
sm.AddServer(&server_details.ServerDetails{Name: nodeName})
|
||||
}
|
||||
|
||||
d := sm.refreshServerRebalanceTimer(timer)
|
||||
if d < s.minRebalance {
|
||||
t.Fatalf("duration too short for cluster of size %d and %d servers (%s < %s)", s.numNodes, s.numServers, d, s.minRebalance)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// func (sm *ServerManager) saveServerConfig(sc serverConfig) {
|
||||
func TestServerManagerInternal_saveServerConfig(t *testing.T) {
|
||||
sm := testServerManager()
|
||||
|
||||
// Initial condition
|
||||
func() {
|
||||
sc := sm.getServerConfig()
|
||||
if len(sc.servers) != 0 {
|
||||
t.Fatalf("ServerManager.saveServerConfig failed to load init config")
|
||||
}
|
||||
|
||||
newServer := new(server_details.ServerDetails)
|
||||
sc.servers = append(sc.servers, newServer)
|
||||
sm.saveServerConfig(sc)
|
||||
}()
|
||||
|
||||
// Test that save works
|
||||
func() {
|
||||
sc1 := sm.getServerConfig()
|
||||
t1NumServers := len(sc1.servers)
|
||||
if t1NumServers != 1 {
|
||||
t.Fatalf("ServerManager.saveServerConfig failed to save mutated config")
|
||||
}
|
||||
}()
|
||||
|
||||
// Verify mutation w/o a save doesn't alter the original
|
||||
func() {
|
||||
newServer := new(server_details.ServerDetails)
|
||||
sc := sm.getServerConfig()
|
||||
sc.servers = append(sc.servers, newServer)
|
||||
|
||||
sc_orig := sm.getServerConfig()
|
||||
origNumServers := len(sc_orig.servers)
|
||||
if origNumServers >= len(sc.servers) {
|
||||
t.Fatalf("ServerManager.saveServerConfig unsaved config overwrote original")
|
||||
}
|
||||
}()
|
||||
}
|
|
@ -0,0 +1,359 @@
|
|||
package server_manager_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/consul/server_details"
|
||||
"github.com/hashicorp/consul/consul/server_manager"
|
||||
)
|
||||
|
||||
var (
|
||||
localLogger *log.Logger
|
||||
localLogBuffer *bytes.Buffer
|
||||
)
|
||||
|
||||
func init() {
|
||||
localLogBuffer = new(bytes.Buffer)
|
||||
localLogger = log.New(localLogBuffer, "", 0)
|
||||
}
|
||||
|
||||
func GetBufferedLogger() *log.Logger {
|
||||
return localLogger
|
||||
}
|
||||
|
||||
type fauxSerf struct {
|
||||
}
|
||||
|
||||
func (s *fauxSerf) NumNodes() int {
|
||||
return 16384
|
||||
}
|
||||
|
||||
func testServerManager() (sm *server_manager.ServerManager) {
|
||||
logger := GetBufferedLogger()
|
||||
logger = log.New(os.Stderr, "", log.LstdFlags)
|
||||
shutdownCh := make(chan struct{})
|
||||
sm = server_manager.New(logger, shutdownCh, &fauxSerf{})
|
||||
return sm
|
||||
}
|
||||
|
||||
// func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
|
||||
func TestServerManager_AddServer(t *testing.T) {
|
||||
sm := testServerManager()
|
||||
var num int
|
||||
num = sm.NumServers()
|
||||
if num != 0 {
|
||||
t.Fatalf("Expected zero servers to start")
|
||||
}
|
||||
|
||||
s1 := &server_details.ServerDetails{Name: "s1"}
|
||||
sm.AddServer(s1)
|
||||
num = sm.NumServers()
|
||||
if num != 1 {
|
||||
t.Fatalf("Expected one server")
|
||||
}
|
||||
|
||||
sm.AddServer(s1)
|
||||
num = sm.NumServers()
|
||||
if num != 1 {
|
||||
t.Fatalf("Expected one server (still)")
|
||||
}
|
||||
|
||||
s2 := &server_details.ServerDetails{Name: "s2"}
|
||||
sm.AddServer(s2)
|
||||
num = sm.NumServers()
|
||||
if num != 2 {
|
||||
t.Fatalf("Expected two servers")
|
||||
}
|
||||
}
|
||||
|
||||
// func (sm *ServerManager) FindServer() (server *server_details.ServerDetails) {
|
||||
func TestServerManager_FindServer(t *testing.T) {
|
||||
sm := testServerManager()
|
||||
|
||||
if sm.FindServer() != nil {
|
||||
t.Fatalf("Expected nil return")
|
||||
}
|
||||
|
||||
sm.AddServer(&server_details.ServerDetails{Name: "s1"})
|
||||
if sm.NumServers() != 1 {
|
||||
t.Fatalf("Expected one server")
|
||||
}
|
||||
|
||||
s1 := sm.FindServer()
|
||||
if s1 == nil {
|
||||
t.Fatalf("Expected non-nil server")
|
||||
}
|
||||
if s1.Name != "s1" {
|
||||
t.Fatalf("Expected s1 server")
|
||||
}
|
||||
|
||||
s1 = sm.FindServer()
|
||||
if s1 == nil || s1.Name != "s1" {
|
||||
t.Fatalf("Expected s1 server (still)")
|
||||
}
|
||||
|
||||
sm.AddServer(&server_details.ServerDetails{Name: "s2"})
|
||||
if sm.NumServers() != 2 {
|
||||
t.Fatalf("Expected two servers")
|
||||
}
|
||||
s1 = sm.FindServer()
|
||||
if s1 == nil || s1.Name != "s1" {
|
||||
t.Fatalf("Expected s1 server (still)")
|
||||
}
|
||||
|
||||
sm.NotifyFailedServer(s1)
|
||||
s2 := sm.FindServer()
|
||||
if s2 == nil || s2.Name != "s2" {
|
||||
t.Fatalf("Expected s2 server")
|
||||
}
|
||||
|
||||
sm.NotifyFailedServer(s2)
|
||||
s1 = sm.FindServer()
|
||||
if s1 == nil || s1.Name != "s1" {
|
||||
t.Fatalf("Expected s1 server")
|
||||
}
|
||||
}
|
||||
|
||||
// func New(logger *log.Logger, shutdownCh chan struct{}) (sm *ServerManager) {
|
||||
func TestServerManager_New(t *testing.T) {
|
||||
logger := GetBufferedLogger()
|
||||
logger = log.New(os.Stderr, "", log.LstdFlags)
|
||||
shutdownCh := make(chan struct{})
|
||||
sm := server_manager.New(logger, shutdownCh, &fauxSerf{})
|
||||
if sm == nil {
|
||||
t.Fatalf("ServerManager nil")
|
||||
}
|
||||
}
|
||||
|
||||
// func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) {
|
||||
func TestServerManager_NotifyFailedServer(t *testing.T) {
|
||||
sm := testServerManager()
|
||||
|
||||
if sm.NumServers() != 0 {
|
||||
t.Fatalf("Expected zero servers to start")
|
||||
}
|
||||
|
||||
s1 := &server_details.ServerDetails{Name: "s1"}
|
||||
s2 := &server_details.ServerDetails{Name: "s2"}
|
||||
|
||||
// Try notifying for a server that is not part of the server manager
|
||||
sm.NotifyFailedServer(s1)
|
||||
if sm.NumServers() != 0 {
|
||||
t.Fatalf("Expected zero servers to start")
|
||||
}
|
||||
sm.AddServer(s1)
|
||||
|
||||
// Test again w/ a server not in the list
|
||||
sm.NotifyFailedServer(s2)
|
||||
if sm.NumServers() != 1 {
|
||||
t.Fatalf("Expected one server")
|
||||
}
|
||||
|
||||
sm.AddServer(s2)
|
||||
if sm.NumServers() != 2 {
|
||||
t.Fatalf("Expected two servers")
|
||||
}
|
||||
|
||||
s1 = sm.FindServer()
|
||||
if s1 == nil || s1.Name != "s1" {
|
||||
t.Fatalf("Expected s1 server")
|
||||
}
|
||||
|
||||
sm.NotifyFailedServer(s2)
|
||||
s1 = sm.FindServer()
|
||||
if s1 == nil || s1.Name != "s1" {
|
||||
t.Fatalf("Expected s1 server (still)")
|
||||
}
|
||||
|
||||
sm.NotifyFailedServer(s1)
|
||||
s2 = sm.FindServer()
|
||||
if s2 == nil || s2.Name != "s2" {
|
||||
t.Fatalf("Expected s2 server")
|
||||
}
|
||||
|
||||
sm.NotifyFailedServer(s2)
|
||||
s1 = sm.FindServer()
|
||||
if s1 == nil || s1.Name != "s1" {
|
||||
t.Fatalf("Expected s1 server")
|
||||
}
|
||||
}
|
||||
|
||||
// func (sm *ServerManager) NumServers() (numServers int) {
|
||||
func TestServerManager_NumServers(t *testing.T) {
|
||||
sm := testServerManager()
|
||||
var num int
|
||||
num = sm.NumServers()
|
||||
if num != 0 {
|
||||
t.Fatalf("Expected zero servers to start")
|
||||
}
|
||||
|
||||
s := &server_details.ServerDetails{}
|
||||
sm.AddServer(s)
|
||||
num = sm.NumServers()
|
||||
if num != 1 {
|
||||
t.Fatalf("Expected one server after AddServer")
|
||||
}
|
||||
}
|
||||
|
||||
// func (sm *ServerManager) RebalanceServers() {
|
||||
func TestServerManager_RebalanceServers(t *testing.T) {
|
||||
sm := testServerManager()
|
||||
const maxServers = 100
|
||||
const numShuffleTests = 100
|
||||
const uniquePassRate = 0.5
|
||||
|
||||
// Make a huge list of nodes.
|
||||
for i := 0; i < maxServers; i++ {
|
||||
nodeName := fmt.Sprintf("s%02d", i)
|
||||
sm.AddServer(&server_details.ServerDetails{Name: nodeName})
|
||||
}
|
||||
|
||||
// Keep track of how many unique shuffles we get.
|
||||
uniques := make(map[string]struct{}, maxServers)
|
||||
for i := 0; i < numShuffleTests; i++ {
|
||||
sm.RebalanceServers()
|
||||
|
||||
var names []string
|
||||
for j := 0; j < maxServers; j++ {
|
||||
server := sm.FindServer()
|
||||
sm.NotifyFailedServer(server)
|
||||
names = append(names, server.Name)
|
||||
}
|
||||
key := strings.Join(names, "|")
|
||||
uniques[key] = struct{}{}
|
||||
}
|
||||
|
||||
// We have to allow for the fact that there won't always be a unique
|
||||
// shuffle each pass, so we just look for smell here without the test
|
||||
// being flaky.
|
||||
if len(uniques) < int(maxServers*uniquePassRate) {
|
||||
t.Fatalf("unique shuffle ratio too low: %d/%d", len(uniques), maxServers)
|
||||
}
|
||||
}
|
||||
|
||||
// func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
|
||||
func TestServerManager_RemoveServer(t *testing.T) {
|
||||
const nodeNameFmt = "s%02d"
|
||||
sm := testServerManager()
|
||||
|
||||
if sm.NumServers() != 0 {
|
||||
t.Fatalf("Expected zero servers to start")
|
||||
}
|
||||
|
||||
// Test removing server before its added
|
||||
nodeName := fmt.Sprintf(nodeNameFmt, 1)
|
||||
s1 := &server_details.ServerDetails{Name: nodeName}
|
||||
sm.RemoveServer(s1)
|
||||
sm.AddServer(s1)
|
||||
|
||||
nodeName = fmt.Sprintf(nodeNameFmt, 2)
|
||||
s2 := &server_details.ServerDetails{Name: nodeName}
|
||||
sm.RemoveServer(s2)
|
||||
sm.AddServer(s2)
|
||||
|
||||
const maxServers = 19
|
||||
servers := make([]*server_details.ServerDetails, maxServers)
|
||||
// Already added two servers above
|
||||
for i := maxServers; i > 2; i-- {
|
||||
nodeName := fmt.Sprintf(nodeNameFmt, i)
|
||||
server := &server_details.ServerDetails{Name: nodeName}
|
||||
servers = append(servers, server)
|
||||
sm.AddServer(server)
|
||||
}
|
||||
sm.RebalanceServers()
|
||||
|
||||
if sm.NumServers() != maxServers {
|
||||
t.Fatalf("Expected %d servers, received %d", maxServers, sm.NumServers())
|
||||
}
|
||||
|
||||
findServer := func(server *server_details.ServerDetails) bool {
|
||||
for i := sm.NumServers(); i > 0; i-- {
|
||||
s := sm.FindServer()
|
||||
if s == server {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
expectedNumServers := maxServers
|
||||
removedServers := make([]*server_details.ServerDetails, 0, maxServers)
|
||||
|
||||
// Remove servers from the front of the list
|
||||
for i := 3; i > 0; i-- {
|
||||
server := sm.FindServer()
|
||||
if server == nil {
|
||||
t.Fatalf("FindServer returned nil")
|
||||
}
|
||||
sm.RemoveServer(server)
|
||||
expectedNumServers--
|
||||
if sm.NumServers() != expectedNumServers {
|
||||
t.Fatalf("Expected %d servers (got %d)", expectedNumServers, sm.NumServers())
|
||||
}
|
||||
if findServer(server) == true {
|
||||
t.Fatalf("Did not expect to find server %s after removal from the front", server.Name)
|
||||
}
|
||||
removedServers = append(removedServers, server)
|
||||
}
|
||||
|
||||
// Remove server from the end of the list
|
||||
for i := 3; i > 0; i-- {
|
||||
server := sm.FindServer()
|
||||
sm.NotifyFailedServer(server)
|
||||
sm.RemoveServer(server)
|
||||
expectedNumServers--
|
||||
if sm.NumServers() != expectedNumServers {
|
||||
t.Fatalf("Expected %d servers (got %d)", expectedNumServers, sm.NumServers())
|
||||
}
|
||||
if findServer(server) == true {
|
||||
t.Fatalf("Did not expect to find server %s", server.Name)
|
||||
}
|
||||
removedServers = append(removedServers, server)
|
||||
}
|
||||
|
||||
// Remove server from the middle of the list
|
||||
for i := 3; i > 0; i-- {
|
||||
server := sm.FindServer()
|
||||
sm.NotifyFailedServer(server)
|
||||
server2 := sm.FindServer()
|
||||
sm.NotifyFailedServer(server2) // server2 now at end of the list
|
||||
|
||||
sm.RemoveServer(server)
|
||||
expectedNumServers--
|
||||
if sm.NumServers() != expectedNumServers {
|
||||
t.Fatalf("Expected %d servers (got %d)", expectedNumServers, sm.NumServers())
|
||||
}
|
||||
if findServer(server) == true {
|
||||
t.Fatalf("Did not expect to find server %s", server.Name)
|
||||
}
|
||||
removedServers = append(removedServers, server)
|
||||
}
|
||||
|
||||
if sm.NumServers()+len(removedServers) != maxServers {
|
||||
t.Fatalf("Expected %d+%d=%d servers", sm.NumServers(), len(removedServers), maxServers)
|
||||
}
|
||||
|
||||
// Drain the remaining servers from the middle
|
||||
for i := sm.NumServers(); i > 0; i-- {
|
||||
server := sm.FindServer()
|
||||
sm.NotifyFailedServer(server)
|
||||
server2 := sm.FindServer()
|
||||
sm.NotifyFailedServer(server2) // server2 now at end of the list
|
||||
sm.RemoveServer(server)
|
||||
removedServers = append(removedServers, server)
|
||||
}
|
||||
|
||||
if sm.NumServers() != 0 {
|
||||
t.Fatalf("Expected an empty server list")
|
||||
}
|
||||
if len(removedServers) != maxServers {
|
||||
t.Fatalf("Expected all servers to be in removed server list")
|
||||
}
|
||||
}
|
||||
|
||||
// func (sm *ServerManager) Start() {
|
|
@ -410,7 +410,7 @@ type CheckServiceNodes []CheckServiceNode
|
|||
// Shuffle does an in-place random shuffle using the Fisher-Yates algorithm.
|
||||
func (nodes CheckServiceNodes) Shuffle() {
|
||||
for i := len(nodes) - 1; i > 0; i-- {
|
||||
j := rand.Int31() % int32(i+1)
|
||||
j := rand.Int31n(int32(i + 1))
|
||||
nodes[i], nodes[j] = nodes[j], nodes[i]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,21 +23,6 @@ import (
|
|||
*/
|
||||
var privateBlocks []*net.IPNet
|
||||
|
||||
// serverparts is used to return the parts of a server role
|
||||
type serverParts struct {
|
||||
Name string
|
||||
Datacenter string
|
||||
Port int
|
||||
Bootstrap bool
|
||||
Expect int
|
||||
Version int
|
||||
Addr net.Addr
|
||||
}
|
||||
|
||||
func (s *serverParts) String() string {
|
||||
return fmt.Sprintf("%s (Addr: %s) (DC: %s)", s.Name, s.Addr, s.Datacenter)
|
||||
}
|
||||
|
||||
func init() {
|
||||
// Add each private block
|
||||
privateBlocks = make([]*net.IPNet, 6)
|
||||
|
@ -116,52 +101,6 @@ func CanServersUnderstandProtocol(members []serf.Member, version uint8) (bool, e
|
|||
return (numServers > 0) && (numWhoGrok == numServers), nil
|
||||
}
|
||||
|
||||
// Returns if a member is a consul server. Returns a bool,
|
||||
// the datacenter, and the rpc port
|
||||
func isConsulServer(m serf.Member) (bool, *serverParts) {
|
||||
if m.Tags["role"] != "consul" {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
datacenter := m.Tags["dc"]
|
||||
_, bootstrap := m.Tags["bootstrap"]
|
||||
|
||||
expect := 0
|
||||
expect_str, ok := m.Tags["expect"]
|
||||
var err error
|
||||
if ok {
|
||||
expect, err = strconv.Atoi(expect_str)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
port_str := m.Tags["port"]
|
||||
port, err := strconv.Atoi(port_str)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
vsn_str := m.Tags["vsn"]
|
||||
vsn, err := strconv.Atoi(vsn_str)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
addr := &net.TCPAddr{IP: m.Addr, Port: port}
|
||||
|
||||
parts := &serverParts{
|
||||
Name: m.Name,
|
||||
Datacenter: datacenter,
|
||||
Port: port,
|
||||
Bootstrap: bootstrap,
|
||||
Expect: expect,
|
||||
Addr: addr,
|
||||
Version: vsn,
|
||||
}
|
||||
return true, parts
|
||||
}
|
||||
|
||||
// Returns if a member is a consul node. Returns a bool,
|
||||
// and the datacenter.
|
||||
func isConsulNode(m serf.Member) (bool, string) {
|
||||
|
|
|
@ -196,49 +196,6 @@ func TestUtil_CanServersUnderstandProtocol(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestIsConsulServer(t *testing.T) {
|
||||
m := serf.Member{
|
||||
Name: "foo",
|
||||
Addr: net.IP([]byte{127, 0, 0, 1}),
|
||||
Tags: map[string]string{
|
||||
"role": "consul",
|
||||
"dc": "east-aws",
|
||||
"port": "10000",
|
||||
"vsn": "1",
|
||||
},
|
||||
}
|
||||
valid, parts := isConsulServer(m)
|
||||
if !valid || parts.Datacenter != "east-aws" || parts.Port != 10000 {
|
||||
t.Fatalf("bad: %v %v", valid, parts)
|
||||
}
|
||||
if parts.Name != "foo" {
|
||||
t.Fatalf("bad: %v", parts)
|
||||
}
|
||||
if parts.Bootstrap {
|
||||
t.Fatalf("unexpected bootstrap")
|
||||
}
|
||||
if parts.Expect != 0 {
|
||||
t.Fatalf("bad: %v", parts.Expect)
|
||||
}
|
||||
m.Tags["bootstrap"] = "1"
|
||||
valid, parts = isConsulServer(m)
|
||||
if !valid || !parts.Bootstrap {
|
||||
t.Fatalf("expected bootstrap")
|
||||
}
|
||||
if parts.Addr.String() != "127.0.0.1:10000" {
|
||||
t.Fatalf("bad addr: %v", parts.Addr)
|
||||
}
|
||||
if parts.Version != 1 {
|
||||
t.Fatalf("bad: %v", parts)
|
||||
}
|
||||
m.Tags["expect"] = "3"
|
||||
delete(m.Tags, "bootstrap")
|
||||
valid, parts = isConsulServer(m)
|
||||
if !valid || parts.Expect != 3 {
|
||||
t.Fatalf("bad: %v", parts.Expect)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsConsulNode(t *testing.T) {
|
||||
m := serf.Member{
|
||||
Tags: map[string]string{
|
||||
|
|
|
@ -14,6 +14,10 @@ func RandomStagger(intv time.Duration) time.Duration {
|
|||
// order to target an aggregate number of actions per second across the whole
|
||||
// cluster.
|
||||
func RateScaledInterval(rate float64, min time.Duration, n int) time.Duration {
|
||||
const minRate = 1 / 86400 // 1/(1 * time.Day)
|
||||
if rate <= minRate {
|
||||
return min
|
||||
}
|
||||
interval := time.Duration(float64(time.Second) * float64(n) / rate)
|
||||
if interval < min {
|
||||
return min
|
||||
|
|
|
@ -16,7 +16,7 @@ func TestRandomStagger(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestRateScaledInterval(t *testing.T) {
|
||||
min := 1 * time.Second
|
||||
const min = 1 * time.Second
|
||||
rate := 200.0
|
||||
if v := RateScaledInterval(rate, min, 0); v != min {
|
||||
t.Fatalf("Bad: %v", v)
|
||||
|
@ -36,4 +36,13 @@ func TestRateScaledInterval(t *testing.T) {
|
|||
if v := RateScaledInterval(rate, min, 10000); v != 50*time.Second {
|
||||
t.Fatalf("Bad: %v", v)
|
||||
}
|
||||
if v := RateScaledInterval(0, min, 10000); v != min {
|
||||
t.Fatalf("Bad: %v", v)
|
||||
}
|
||||
if v := RateScaledInterval(0.0, min, 10000); v != min {
|
||||
t.Fatalf("Bad: %v", v)
|
||||
}
|
||||
if v := RateScaledInterval(-1, min, 10000); v != min {
|
||||
t.Fatalf("Bad: %v", v)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,4 +12,4 @@ go build -o $TEMPDIR/consul || exit 1
|
|||
|
||||
# Run the tests
|
||||
echo "--> Running tests"
|
||||
go list ./... | grep -v ^github.com/hashicorp/consul/vendor/ | PATH=$TEMPDIR:$PATH xargs -n1 go test
|
||||
go list ./... | grep -v ^github.com/hashicorp/consul/vendor/ | PATH=$TEMPDIR:$PATH xargs -n1 go test ${GOTEST_FLAGS:-}
|
||||
|
|
|
@ -835,6 +835,10 @@ Below is a list of public, open source projects that use Bolt:
|
|||
backed by boltdb.
|
||||
* [buckets](https://github.com/joyrexus/buckets) - a bolt wrapper streamlining
|
||||
simple tx and key scans.
|
||||
* [mbuckets](https://github.com/abhigupta912/mbuckets) - A Bolt wrapper that allows easy operations on multi level (nested) buckets.
|
||||
* [Request Baskets](https://github.com/darklynx/request-baskets) - A web service to collect arbitrary HTTP requests and inspect them via REST API or simple web UI, similar to [RequestBin](http://requestb.in/) service
|
||||
* [Go Report Card](https://goreportcard.com/) - Go code quality report cards as a (free and open source) service.
|
||||
* [Boltdb Boilerplate](https://github.com/bobintornado/boltdb-boilerplate) - Boilerplate wrapper around bolt aiming to make simple calls one-liners.
|
||||
* [lru](https://github.com/crowdriff/lru) - Easy to use Bolt-backed Least-Recently-Used (LRU) read-through cache with chainable remote stores.
|
||||
|
||||
If you are using Bolt in a project please send a pull request to add it to the list.
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
version: "{build}"
|
||||
|
||||
os: Windows Server 2012 R2
|
||||
|
||||
clone_folder: c:\gopath\src\github.com\boltdb\bolt
|
||||
|
||||
environment:
|
||||
GOPATH: c:\gopath
|
||||
|
||||
install:
|
||||
- echo %PATH%
|
||||
- echo %GOPATH%
|
||||
- go version
|
||||
- go env
|
||||
- go get -v -t ./...
|
||||
|
||||
build_script:
|
||||
- go test -v ./...
|
|
@ -0,0 +1,9 @@
|
|||
// +build ppc
|
||||
|
||||
package bolt
|
||||
|
||||
// maxMapSize represents the largest mmap size supported by Bolt.
|
||||
const maxMapSize = 0x7FFFFFFF // 2GB
|
||||
|
||||
// maxAllocSize is the size used when creating array pointers.
|
||||
const maxAllocSize = 0xFFFFFFF
|
|
@ -0,0 +1,9 @@
|
|||
// +build ppc64
|
||||
|
||||
package bolt
|
||||
|
||||
// maxMapSize represents the largest mmap size supported by Bolt.
|
||||
const maxMapSize = 0xFFFFFFFFFFFF // 256TB
|
||||
|
||||
// maxAllocSize is the size used when creating array pointers.
|
||||
const maxAllocSize = 0x7FFFFFFF
|
|
@ -11,7 +11,7 @@ import (
|
|||
)
|
||||
|
||||
// flock acquires an advisory lock on a file descriptor.
|
||||
func flock(f *os.File, exclusive bool, timeout time.Duration) error {
|
||||
func flock(db *DB, mode os.FileMode, exclusive bool, timeout time.Duration) error {
|
||||
var t time.Time
|
||||
for {
|
||||
// If we're beyond our timeout then return an error.
|
||||
|
@ -27,7 +27,7 @@ func flock(f *os.File, exclusive bool, timeout time.Duration) error {
|
|||
}
|
||||
|
||||
// Otherwise attempt to obtain an exclusive lock.
|
||||
err := syscall.Flock(int(f.Fd()), flag|syscall.LOCK_NB)
|
||||
err := syscall.Flock(int(db.file.Fd()), flag|syscall.LOCK_NB)
|
||||
if err == nil {
|
||||
return nil
|
||||
} else if err != syscall.EWOULDBLOCK {
|
||||
|
@ -40,8 +40,8 @@ func flock(f *os.File, exclusive bool, timeout time.Duration) error {
|
|||
}
|
||||
|
||||
// funlock releases an advisory lock on a file descriptor.
|
||||
func funlock(f *os.File) error {
|
||||
return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
|
||||
func funlock(db *DB) error {
|
||||
return syscall.Flock(int(db.file.Fd()), syscall.LOCK_UN)
|
||||
}
|
||||
|
||||
// mmap memory maps a DB's data file.
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
)
|
||||
|
||||
// flock acquires an advisory lock on a file descriptor.
|
||||
func flock(f *os.File, exclusive bool, timeout time.Duration) error {
|
||||
func flock(db *DB, mode os.FileMode, exclusive bool, timeout time.Duration) error {
|
||||
var t time.Time
|
||||
for {
|
||||
// If we're beyond our timeout then return an error.
|
||||
|
@ -32,7 +32,7 @@ func flock(f *os.File, exclusive bool, timeout time.Duration) error {
|
|||
} else {
|
||||
lock.Type = syscall.F_RDLCK
|
||||
}
|
||||
err := syscall.FcntlFlock(f.Fd(), syscall.F_SETLK, &lock)
|
||||
err := syscall.FcntlFlock(db.file.Fd(), syscall.F_SETLK, &lock)
|
||||
if err == nil {
|
||||
return nil
|
||||
} else if err != syscall.EAGAIN {
|
||||
|
@ -45,13 +45,13 @@ func flock(f *os.File, exclusive bool, timeout time.Duration) error {
|
|||
}
|
||||
|
||||
// funlock releases an advisory lock on a file descriptor.
|
||||
func funlock(f *os.File) error {
|
||||
func funlock(db *DB) error {
|
||||
var lock syscall.Flock_t
|
||||
lock.Start = 0
|
||||
lock.Len = 0
|
||||
lock.Type = syscall.F_UNLCK
|
||||
lock.Whence = 0
|
||||
return syscall.FcntlFlock(uintptr(f.Fd()), syscall.F_SETLK, &lock)
|
||||
return syscall.FcntlFlock(uintptr(db.file.Fd()), syscall.F_SETLK, &lock)
|
||||
}
|
||||
|
||||
// mmap memory maps a DB's data file.
|
||||
|
|
|
@ -16,6 +16,8 @@ var (
|
|||
)
|
||||
|
||||
const (
|
||||
lockExt = ".lock"
|
||||
|
||||
// see https://msdn.microsoft.com/en-us/library/windows/desktop/aa365203(v=vs.85).aspx
|
||||
flagLockExclusive = 2
|
||||
flagLockFailImmediately = 1
|
||||
|
@ -46,7 +48,16 @@ func fdatasync(db *DB) error {
|
|||
}
|
||||
|
||||
// flock acquires an advisory lock on a file descriptor.
|
||||
func flock(f *os.File, exclusive bool, timeout time.Duration) error {
|
||||
func flock(db *DB, mode os.FileMode, exclusive bool, timeout time.Duration) error {
|
||||
// Create a separate lock file on windows because a process
|
||||
// cannot share an exclusive lock on the same file. This is
|
||||
// needed during Tx.WriteTo().
|
||||
f, err := os.OpenFile(db.path+lockExt, os.O_CREATE, mode)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
db.lockfile = f
|
||||
|
||||
var t time.Time
|
||||
for {
|
||||
// If we're beyond our timeout then return an error.
|
||||
|
@ -62,7 +73,7 @@ func flock(f *os.File, exclusive bool, timeout time.Duration) error {
|
|||
flag |= flagLockExclusive
|
||||
}
|
||||
|
||||
err := lockFileEx(syscall.Handle(f.Fd()), flag, 0, 1, 0, &syscall.Overlapped{})
|
||||
err := lockFileEx(syscall.Handle(db.lockfile.Fd()), flag, 0, 1, 0, &syscall.Overlapped{})
|
||||
if err == nil {
|
||||
return nil
|
||||
} else if err != errLockViolation {
|
||||
|
@ -75,8 +86,11 @@ func flock(f *os.File, exclusive bool, timeout time.Duration) error {
|
|||
}
|
||||
|
||||
// funlock releases an advisory lock on a file descriptor.
|
||||
func funlock(f *os.File) error {
|
||||
return unlockFileEx(syscall.Handle(f.Fd()), 0, 1, 0, &syscall.Overlapped{})
|
||||
func funlock(db *DB) error {
|
||||
err := unlockFileEx(syscall.Handle(db.lockfile.Fd()), 0, 1, 0, &syscall.Overlapped{})
|
||||
db.lockfile.Close()
|
||||
os.Remove(db.path+lockExt)
|
||||
return err
|
||||
}
|
||||
|
||||
// mmap memory maps a DB's data file.
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -93,6 +93,7 @@ type DB struct {
|
|||
|
||||
path string
|
||||
file *os.File
|
||||
lockfile *os.File // windows only
|
||||
dataref []byte // mmap'ed readonly, write throws SEGV
|
||||
data *[maxMapSize]byte
|
||||
datasz int
|
||||
|
@ -177,7 +178,7 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
|
|||
// if !options.ReadOnly.
|
||||
// The database file is locked using the shared lock (more than one process may
|
||||
// hold a lock at the same time) otherwise (options.ReadOnly is set).
|
||||
if err := flock(db.file, !db.readOnly, options.Timeout); err != nil {
|
||||
if err := flock(db, mode, !db.readOnly, options.Timeout); err != nil {
|
||||
_ = db.close()
|
||||
return nil, err
|
||||
}
|
||||
|
@ -379,6 +380,10 @@ func (db *DB) Close() error {
|
|||
}
|
||||
|
||||
func (db *DB) close() error {
|
||||
if !db.opened {
|
||||
return nil
|
||||
}
|
||||
|
||||
db.opened = false
|
||||
|
||||
db.freelist = nil
|
||||
|
@ -397,7 +402,7 @@ func (db *DB) close() error {
|
|||
// No need to unlock read-only file.
|
||||
if !db.readOnly {
|
||||
// Unlock the file.
|
||||
if err := funlock(db.file); err != nil {
|
||||
if err := funlock(db); err != nil {
|
||||
log.Printf("bolt.Close(): funlock error: %s", err)
|
||||
}
|
||||
}
|
||||
|
@ -824,8 +829,10 @@ func (db *DB) grow(sz int) error {
|
|||
// Truncate and fsync to ensure file size metadata is flushed.
|
||||
// https://github.com/boltdb/bolt/issues/284
|
||||
if !db.NoGrowSync && !db.readOnly {
|
||||
if err := db.file.Truncate(int64(sz)); err != nil {
|
||||
return fmt.Errorf("file resize error: %s", err)
|
||||
if runtime.GOOS != "windows" {
|
||||
if err := db.file.Truncate(int64(sz)); err != nil {
|
||||
return fmt.Errorf("file resize error: %s", err)
|
||||
}
|
||||
}
|
||||
if err := db.file.Sync(); err != nil {
|
||||
return fmt.Errorf("file sync error: %s", err)
|
||||
|
|
|
@ -463,43 +463,6 @@ func (n *node) rebalance() {
|
|||
target = n.prevSibling()
|
||||
}
|
||||
|
||||
// If target node has extra nodes then just move one over.
|
||||
if target.numChildren() > target.minKeys() {
|
||||
if useNextSibling {
|
||||
// Reparent and move node.
|
||||
if child, ok := n.bucket.nodes[target.inodes[0].pgid]; ok {
|
||||
child.parent.removeChild(child)
|
||||
child.parent = n
|
||||
child.parent.children = append(child.parent.children, child)
|
||||
}
|
||||
n.inodes = append(n.inodes, target.inodes[0])
|
||||
target.inodes = target.inodes[1:]
|
||||
|
||||
// Update target key on parent.
|
||||
target.parent.put(target.key, target.inodes[0].key, nil, target.pgid, 0)
|
||||
target.key = target.inodes[0].key
|
||||
_assert(len(target.key) > 0, "rebalance(1): zero-length node key")
|
||||
} else {
|
||||
// Reparent and move node.
|
||||
if child, ok := n.bucket.nodes[target.inodes[len(target.inodes)-1].pgid]; ok {
|
||||
child.parent.removeChild(child)
|
||||
child.parent = n
|
||||
child.parent.children = append(child.parent.children, child)
|
||||
}
|
||||
n.inodes = append(n.inodes, inode{})
|
||||
copy(n.inodes[1:], n.inodes)
|
||||
n.inodes[0] = target.inodes[len(target.inodes)-1]
|
||||
target.inodes = target.inodes[:len(target.inodes)-1]
|
||||
}
|
||||
|
||||
// Update parent key for node.
|
||||
n.parent.put(n.key, n.inodes[0].key, nil, n.pgid, 0)
|
||||
n.key = n.inodes[0].key
|
||||
_assert(len(n.key) > 0, "rebalance(2): zero-length node key")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// If both this node and the target node are too small then merge them.
|
||||
if useNextSibling {
|
||||
// Reparent all child nodes being moved.
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"io"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
"unsafe"
|
||||
)
|
||||
|
@ -202,8 +203,17 @@ func (tx *Tx) Commit() error {
|
|||
// If strict mode is enabled then perform a consistency check.
|
||||
// Only the first consistency error is reported in the panic.
|
||||
if tx.db.StrictMode {
|
||||
if err, ok := <-tx.Check(); ok {
|
||||
panic("check fail: " + err.Error())
|
||||
ch := tx.Check()
|
||||
var errs []string
|
||||
for {
|
||||
err, ok := <-ch
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
errs = append(errs, err.Error())
|
||||
}
|
||||
if len(errs) > 0 {
|
||||
panic("check fail: " + strings.Join(errs, "\n"))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -297,12 +307,34 @@ func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) {
|
|||
}
|
||||
defer func() { _ = f.Close() }()
|
||||
|
||||
// Copy the meta pages.
|
||||
tx.db.metalock.Lock()
|
||||
n, err = io.CopyN(w, f, int64(tx.db.pageSize*2))
|
||||
tx.db.metalock.Unlock()
|
||||
// Generate a meta page. We use the same page data for both meta pages.
|
||||
buf := make([]byte, tx.db.pageSize)
|
||||
page := (*page)(unsafe.Pointer(&buf[0]))
|
||||
page.flags = metaPageFlag
|
||||
*page.meta() = *tx.meta
|
||||
|
||||
// Write meta 0.
|
||||
page.id = 0
|
||||
page.meta().checksum = page.meta().sum64()
|
||||
nn, err := w.Write(buf)
|
||||
n += int64(nn)
|
||||
if err != nil {
|
||||
return n, fmt.Errorf("meta copy: %s", err)
|
||||
return n, fmt.Errorf("meta 0 copy: %s", err)
|
||||
}
|
||||
|
||||
// Write meta 1 with a lower transaction id.
|
||||
page.id = 1
|
||||
page.meta().txid -= 1
|
||||
page.meta().checksum = page.meta().sum64()
|
||||
nn, err = w.Write(buf)
|
||||
n += int64(nn)
|
||||
if err != nil {
|
||||
return n, fmt.Errorf("meta 1 copy: %s", err)
|
||||
}
|
||||
|
||||
// Move past the meta pages in the file.
|
||||
if _, err := f.Seek(int64(tx.db.pageSize*2), os.SEEK_SET); err != nil {
|
||||
return n, fmt.Errorf("seek: %s", err)
|
||||
}
|
||||
|
||||
// Copy data pages.
|
||||
|
|
|
@ -1,27 +0,0 @@
|
|||
package coordinate
|
||||
|
||||
import (
|
||||
"math"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// verifyEqualFloats will compare f1 and f2 and fail if they are not
|
||||
// "equal" within a threshold.
|
||||
func verifyEqualFloats(t *testing.T, f1 float64, f2 float64) {
|
||||
const zeroThreshold = 1.0e-6
|
||||
if math.Abs(f1-f2) > zeroThreshold {
|
||||
t.Fatalf("equal assertion fail, %9.6f != %9.6f", f1, f2)
|
||||
}
|
||||
}
|
||||
|
||||
// verifyEqualVectors will compare vec1 and vec2 and fail if they are not
|
||||
// "equal" within a threshold.
|
||||
func verifyEqualVectors(t *testing.T, vec1 []float64, vec2 []float64) {
|
||||
if len(vec1) != len(vec2) {
|
||||
t.Fatalf("vector length mismatch, %d != %d", len(vec1), len(vec2))
|
||||
}
|
||||
|
||||
for i, _ := range vec1 {
|
||||
verifyEqualFloats(t, vec1[i], vec2[i])
|
||||
}
|
||||
}
|
|
@ -214,8 +214,8 @@ type queries struct {
|
|||
}
|
||||
|
||||
const (
|
||||
UserEventSizeLimit = 512 // Maximum byte size for event name and payload
|
||||
snapshotSizeLimit = 128 * 1024 // Maximum 128 KB snapshot
|
||||
UserEventSizeLimit = 512 // Maximum byte size for event name and payload
|
||||
snapshotSizeLimit = 128 * 1024 // Maximum 128 KB snapshot
|
||||
)
|
||||
|
||||
// Create creates a new Serf instance, starting all the background tasks
|
||||
|
@ -1680,3 +1680,13 @@ func (s *Serf) GetCachedCoordinate(name string) (coord *coordinate.Coordinate, o
|
|||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// NumNodes returns the number of nodes in the serf cluster, regardless of
|
||||
// their health or status.
|
||||
func (s *Serf) NumNodes() (numNodes int) {
|
||||
s.memberLock.RLock()
|
||||
numNodes = len(s.members)
|
||||
s.memberLock.RUnlock()
|
||||
|
||||
return numNodes
|
||||
}
|
||||
|
|
|
@ -240,16 +240,16 @@ applies a failover policy to it:
|
|||
|
||||
```javascript
|
||||
{
|
||||
"Name": "",
|
||||
"Template" {
|
||||
"Type": "name_prefix_match",
|
||||
},
|
||||
"Service": {
|
||||
"Service": "${name.full}",
|
||||
"Failover": {
|
||||
"NearestN": 3,
|
||||
}
|
||||
}
|
||||
"Name": "",
|
||||
"Template": {
|
||||
"Type": "name_prefix_match"
|
||||
},
|
||||
"Service": {
|
||||
"Service": "${name.full}",
|
||||
"Failover": {
|
||||
"NearestN": 3
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
|
|
|
@ -396,7 +396,7 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
|
|||
This is a nested setting that allows the following keys:
|
||||
* `serf_lan` - The SerfLan address. Accepts values in the form of "host:port" like "10.23.31.101:8301".
|
||||
* `serf_wan` - The SerfWan address. Accepts values in the form of "host:port" like "10.23.31.101:8302".
|
||||
* `rpc` - The RPC address. Accepts values in the form of "host:port" like "10.23.31.101:8400".
|
||||
* `rpc` - The server RPC address. Accepts values in the form of "host:port" like "10.23.31.101:8300".
|
||||
|
||||
* <a name="advertise_addr_wan"></a><a href="#advertise_addr_wan">`advertise_addr_wan`</a> Equivalent to
|
||||
the [`-advertise-wan` command-line flag](#_advertise-wan).
|
||||
|
@ -498,9 +498,13 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
|
|||
UDP response, will set the truncated flag, indicating to clients that they should re-query
|
||||
using TCP to get the full set of records.
|
||||
|
||||
* <a name="only_passing"></a><a href="#only_passing">`only_passing`</a> If set to true, any
|
||||
nodes whose healthchecks are not passing will be excluded from DNS results. By default (or
|
||||
if set to false), only nodes whose healthchecks are failing as critical will be excluded.
|
||||
* <a name="only_passing"></a><a href="#only_passing">`only_passing`</a>
|
||||
When set to true, the default, the querying agent will only receive node
|
||||
or service addresses for healthy services. A healthy service is defined
|
||||
as a service with one or more healthchecks and all defined healthchecks
|
||||
for the service are in a passing or warning state (i.e. not
|
||||
critical). Set to false to have the querying agent include all node and
|
||||
service addresses regardless of the health of the service.
|
||||
|
||||
* <a name="domain"></a><a href="#domain">`domain`</a> Equivalent to the
|
||||
[`-domain` command-line flag](#_domain).
|
||||
|
|
Loading…
Reference in New Issue