Merge pull request #2101 from hashicorp/f-update-serf-memberlist

Updates to latest Serf/memberlist to get lifeguard and TCP joins over DNS.
This commit is contained in:
James Phillips 2016-06-07 14:46:26 -07:00
commit 4ea2e35ff1
10 changed files with 656 additions and 165 deletions

10
Godeps/Godeps.json generated
View File

@ -206,7 +206,7 @@
},
{
"ImportPath": "github.com/hashicorp/memberlist",
"Rev": "cef12ad58224d55cf26caa9e3d239c2fcb3432a2"
"Rev": "215aec831f03c9b7c61ac183d3e28fff3c7d3a37"
},
{
"ImportPath": "github.com/hashicorp/net-rpc-msgpackrpc",
@ -226,13 +226,13 @@
},
{
"ImportPath": "github.com/hashicorp/serf/coordinate",
"Comment": "v0.7.0-62-gb60a6d9",
"Rev": "b60a6d928fe726a588f79a1d500582507f9d79de"
"Comment": "v0.7.0-64-gdce30f1",
"Rev": "dce30f1c7806bf2d96478abb983c53af0e4c8fb2"
},
{
"ImportPath": "github.com/hashicorp/serf/serf",
"Comment": "v0.7.0-62-gb60a6d9",
"Rev": "b60a6d928fe726a588f79a1d500582507f9d79de"
"Comment": "v0.7.0-64-gdce30f1",
"Rev": "dce30f1c7806bf2d96478abb983c53af0e4c8fb2"
},
{
"ImportPath": "github.com/hashicorp/yamux",

View File

@ -82,7 +82,7 @@ least one existing member in order to join the cluster. The new member
does a full state sync with the existing member over TCP and begins gossiping its
existence to the cluster.
Gossip is done over UDP to a with a configurable but fixed fanout and interval.
Gossip is done over UDP with a configurable but fixed fanout and interval.
This ensures that network usage is constant with regards to number of nodes, as opposed to
exponential growth that can occur with traditional heartbeat mechanisms.
Complete state exchanges with a random node are done periodically over

69
vendor/github.com/hashicorp/memberlist/awareness.go generated vendored Normal file
View File

@ -0,0 +1,69 @@
package memberlist
import (
"sync"
"time"
"github.com/armon/go-metrics"
)
// awareness manages a simple metric for tracking the estimated health of the
// local node. Health is primary the node's ability to respond in the soft
// real-time manner required for correct health checking of other nodes in the
// cluster.
type awareness struct {
sync.RWMutex
// max is the upper threshold for the timeout scale (the score will be
// constrained to be from 0 <= score < max).
max int
// score is the current awareness score. Lower values are healthier and
// zero is the minimum value.
score int
}
// newAwareness returns a new awareness object.
func newAwareness(max int) *awareness {
return &awareness{
max: max,
score: 0,
}
}
// ApplyDelta takes the given delta and applies it to the score in a thread-safe
// manner. It also enforces a floor of zero and a max of max, so deltas may not
// change the overall score if it's railed at one of the extremes.
func (a *awareness) ApplyDelta(delta int) {
a.Lock()
initial := a.score
a.score += delta
if a.score < 0 {
a.score = 0
} else if a.score > (a.max - 1) {
a.score = (a.max - 1)
}
final := a.score
a.Unlock()
if initial != final {
metrics.SetGauge([]string{"memberlist", "health", "score"}, float32(final))
}
}
// GetHealthScore returns the raw health score.
func (a *awareness) GetHealthScore() int {
a.RLock()
score := a.score
a.RUnlock()
return score
}
// ScaleTimeout takes the given duration and scales it based on the current
// score. Less healthyness will lead to longer timeouts.
func (a *awareness) ScaleTimeout(timeout time.Duration) time.Duration {
a.RLock()
score := a.score
a.RUnlock()
return timeout * (time.Duration(score) + 1)
}

View File

@ -63,6 +63,23 @@ type Config struct {
// still alive.
SuspicionMult int
// SuspicionMaxTimeoutMult is the multiplier applied to the
// SuspicionTimeout used as an upper bound on detection time. This max
// timeout is calculated using the formula:
//
// SuspicionMaxTimeout = SuspicionMaxTimeoutMult * SuspicionTimeout
//
// If everything is working properly, confirmations from other nodes will
// accelerate suspicion timers in a manner which will cause the timeout
// to reach the base SuspicionTimeout before that elapses, so this value
// will typically only come into play if a node is experiencing issues
// communicating with other nodes. It should be set to a something fairly
// large so that a node having problems will have a lot of chances to
// recover before falsely declaring other nodes as failed, but short
// enough for a legitimately isolated node to still make progress marking
// nodes failed in a reasonable amount of time.
SuspicionMaxTimeoutMult int
// PushPullInterval is the interval between complete state syncs.
// Complete state syncs are done with a single node over TCP and are
// quite expensive relative to standard gossiped messages. Setting this
@ -91,6 +108,11 @@ type Config struct {
// indirect UDP pings.
DisableTcpPings bool
// AwarenessMaxMultiplier will increase the probe interval if the node
// becomes aware that it might be degraded and not meeting the soft real
// time requirements to reliably probe other nodes.
AwarenessMaxMultiplier int
// GossipInterval and GossipNodes are used to configure the gossip
// behavior of memberlist.
//
@ -143,6 +165,10 @@ type Config struct {
Ping PingDelegate
Alive AliveDelegate
// DNSConfigPath points to the system's DNS config file, usually located
// at /etc/resolv.conf. It can be overridden via config for easier testing.
DNSConfigPath string
// LogOutput is the writer where logs should be sent. If this is not
// set, logging will go to stderr by default. You cannot specify both LogOutput
// and Logger at the same time.
@ -174,10 +200,12 @@ func DefaultLANConfig() *Config {
IndirectChecks: 3, // Use 3 nodes for the indirect ping
RetransmitMult: 4, // Retransmit a message 4 * log(N+1) nodes
SuspicionMult: 5, // Suspect a node for 5 * log(N+1) * Interval
SuspicionMaxTimeoutMult: 6, // For 10k nodes this will give a max timeout of 120 seconds
PushPullInterval: 30 * time.Second, // Low frequency
ProbeTimeout: 500 * time.Millisecond, // Reasonable RTT time for LAN
ProbeInterval: 1 * time.Second, // Failure check every second
DisableTcpPings: false, // TCP pings are safe, even with mixed versions
AwarenessMaxMultiplier: 8, // Probe interval backs off to 8 seconds
GossipNodes: 3, // Gossip to 3 nodes
GossipInterval: 200 * time.Millisecond, // Gossip more rapidly
@ -185,8 +213,9 @@ func DefaultLANConfig() *Config {
EnableCompression: true, // Enable compression by default
SecretKey: nil,
Keyring: nil,
DNSConfigPath: "/etc/resolv.conf",
}
}

View File

@ -19,7 +19,8 @@ type Delegate interface {
// It can return a list of buffers to send. Each buffer should assume an
// overhead as provided with a limit on the total byte size allowed.
// The total byte size of the resulting data to send must not exceed
// the limit.
// the limit. Care should be taken that this method does not block,
// since doing so would block the entire UDP packet receive loop.
GetBroadcasts(overhead, limit int) [][]byte
// LocalState is used for a TCP Push/Pull. This is sent to

View File

@ -20,8 +20,12 @@ import (
"net"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"github.com/miekg/dns"
)
type Memberlist struct {
@ -42,6 +46,8 @@ type Memberlist struct {
nodeLock sync.RWMutex
nodes []*nodeState // Known nodes
nodeMap map[string]*nodeState // Maps Addr.String() -> NodeState
nodeTimers map[string]*suspicion // Maps Addr.String() -> suspicion timer
awareness *awareness
tickerLock sync.Mutex
tickers []*time.Ticker
@ -57,7 +63,7 @@ type Memberlist struct {
}
// newMemberlist creates the network listeners.
// Does not schedule execution of background maintenence.
// Does not schedule execution of background maintenance.
func newMemberlist(conf *Config) (*Memberlist, error) {
if conf.ProtocolVersion < ProtocolVersionMin {
return nil, fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]",
@ -125,6 +131,8 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
tcpListener: tcpLn,
handoff: make(chan msgHandoff, 1024),
nodeMap: make(map[string]*nodeState),
nodeTimers: make(map[string]*suspicion),
awareness: newAwareness(conf.AwarenessMaxMultiplier),
ackHandlers: make(map[uint32]*ackHandler),
broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult},
logger: logger,
@ -166,72 +174,152 @@ func Create(conf *Config) (*Memberlist, error) {
// none could be reached. If an error is returned, the node did not successfully
// join the cluster.
func (m *Memberlist) Join(existing []string) (int, error) {
// Attempt to join any of them
numSuccess := 0
var retErr error
var errs error
for _, exist := range existing {
addrs, port, err := m.resolveAddr(exist)
addrs, err := m.resolveAddr(exist)
if err != nil {
m.logger.Printf("[WARN] memberlist: Failed to resolve %s: %v", exist, err)
retErr = err
err = fmt.Errorf("Failed to resolve %s: %v", exist, err)
errs = multierror.Append(errs, err)
m.logger.Printf("[WARN] memberlist: %v", err)
continue
}
for _, addr := range addrs {
if err := m.pushPullNode(addr, port, true); err != nil {
retErr = err
if err := m.pushPullNode(addr.ip, addr.port, true); err != nil {
err = fmt.Errorf("Failed to join %s: %v", addr.ip, err)
errs = multierror.Append(errs, err)
m.logger.Printf("[DEBUG] memberlist: %v", err)
continue
}
numSuccess++
}
}
if numSuccess > 0 {
retErr = nil
errs = nil
}
return numSuccess, errs
}
// ipPort holds information about a node we want to try to join.
type ipPort struct {
ip net.IP
port uint16
}
// tcpLookupIP is a helper to initiate a TCP-based DNS lookup for the given host.
// The built-in Go resolver will do a UDP lookup first, and will only use TCP if
// the response has the truncate bit set, which isn't common on DNS servers like
// Consul's. By doing the TCP lookup directly, we get the best chance for the
// largest list of hosts to join. Since joins are relatively rare events, it's ok
// to do this rather expensive operation.
func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16) ([]ipPort, error) {
// Don't attempt any TCP lookups against non-fully qualified domain
// names, since those will likely come from the resolv.conf file.
if !strings.Contains(host, ".") {
return nil, nil
}
return numSuccess, retErr
// Make sure the domain name is terminated with a dot (we know there's
// at least one character at this point).
dn := host
if dn[len(dn)-1] != '.' {
dn = dn + "."
}
// See if we can find a server to try.
cc, err := dns.ClientConfigFromFile(m.config.DNSConfigPath)
if err != nil {
return nil, err
}
if len(cc.Servers) > 0 {
// We support host:port in the DNS config, but need to add the
// default port if one is not supplied.
server := cc.Servers[0]
if !hasPort(server) {
server = net.JoinHostPort(server, cc.Port)
}
// Do the lookup.
c := new(dns.Client)
c.Net = "tcp"
msg := new(dns.Msg)
msg.SetQuestion(dn, dns.TypeANY)
in, _, err := c.Exchange(msg, server)
if err != nil {
return nil, err
}
// Handle any IPs we get back that we can attempt to join.
var ips []ipPort
for _, r := range in.Answer {
switch rr := r.(type) {
case (*dns.A):
ips = append(ips, ipPort{rr.A, defaultPort})
case (*dns.AAAA):
ips = append(ips, ipPort{rr.AAAA, defaultPort})
case (*dns.CNAME):
m.logger.Printf("[DEBUG] memberlist: Ignoring CNAME RR in TCP-first answer for '%s'", host)
}
}
return ips, nil
}
return nil, nil
}
// resolveAddr is used to resolve the address into an address,
// port, and error. If no port is given, use the default
func (m *Memberlist) resolveAddr(hostStr string) ([][]byte, uint16, error) {
ips := make([][]byte, 0)
func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) {
// Normalize the incoming string to host:port so we can apply Go's
// parser to it.
port := uint16(0)
if !hasPort(hostStr) {
hostStr += ":" + strconv.Itoa(m.config.BindPort)
}
host, sport, err := net.SplitHostPort(hostStr)
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
// error, port missing - we can solve this
port = uint16(m.config.BindPort)
host = hostStr
} else if err != nil {
// error, but not missing port
return ips, port, err
} else if lport, err := strconv.ParseUint(sport, 10, 16); err != nil {
// error, when parsing port
return ips, port, err
} else {
// no error
if err != nil {
return nil, err
}
// This will capture the supplied port, or the default one added above.
lport, err := strconv.ParseUint(sport, 10, 16)
if err != nil {
return nil, err
}
port = uint16(lport)
// If it looks like an IP address we are done. The SplitHostPort() above
// will make sure the host part is in good shape for parsing, even for
// IPv6 addresses.
if ip := net.ParseIP(host); ip != nil {
return []ipPort{ipPort{ip, port}}, nil
}
// Get the addresses that hostPort might resolve to
// ResolveTcpAddr requres ipv6 brackets to separate
// port numbers whereas ParseIP doesn't, but luckily
// SplitHostPort takes care of the brackets
if ip := net.ParseIP(host); ip == nil {
if pre, err := net.LookupIP(host); err == nil {
for _, ip := range pre {
ips = append(ips, ip)
// First try TCP so we have the best chance for the largest list of
// hosts to join. If this fails it's not fatal since this isn't a standard
// way to query DNS, and we have a fallback below.
ips, err := m.tcpLookupIP(host, port)
if err != nil {
m.logger.Printf("[DEBUG] memberlist: TCP-first lookup failed for '%s', falling back to UDP: %s", hostStr, err)
}
} else {
return ips, port, err
}
} else {
ips = append(ips, ip)
if len(ips) > 0 {
return ips, nil
}
return ips, port, nil
// If TCP didn't yield anything then use the normal Go resolver which
// will try UDP, then might possibly try TCP again if the UDP response
// indicates it was truncated.
ans, err := net.LookupIP(host)
if err != nil {
return nil, err
}
ips = make([]ipPort, 0, len(ans))
for _, ip := range ans {
ips = append(ips, ipPort{ip, port})
}
return ips, nil
}
// setAlive is used to mark this node as being alive. This is the same
@ -541,6 +629,13 @@ func (m *Memberlist) anyAlive() bool {
return false
}
// GetHealthScore gives this instance's idea of how well it is meeting the soft
// real-time requirements of the protocol. Lower numbers are better, and zero
// means "totally healthy".
func (m *Memberlist) GetHealthScore() int {
return m.awareness.GetHealthScore()
}
// ProtocolVersion returns the protocol version currently in use by
// this memberlist.
func (m *Memberlist) ProtocolVersion() uint8 {

View File

@ -24,9 +24,15 @@ const (
// A memberlist speaking version 2 of the protocol will attempt
// to TCP ping another memberlist who understands version 3 or
// greater.
//
// Version 4 added support for nacks as part of indirect probes.
// A memberlist speaking version 2 of the protocol will expect
// nacks from another memberlist who understands version 4 or
// greater, and likewise nacks will be sent to memberlists who
// understand version 4 or greater.
ProtocolVersion2Compatible = 2
ProtocolVersionMax = 3
ProtocolVersionMax = 4
)
// messageType is an integer ID of a type of message that can be received
@ -46,6 +52,7 @@ const (
userMsg // User mesg, not handled by us
compressMsg
encryptMsg
nackRespMsg
)
// compressionType is used to specify the compression algorithm
@ -83,6 +90,7 @@ type indirectPingReq struct {
Target []byte
Port uint16
Node string
Nack bool // true if we'd like a nack back
}
// ack response is sent for a ping
@ -91,6 +99,13 @@ type ackResp struct {
Payload []byte
}
// nack response is sent for an indirect ping when the pinger doesn't hear from
// the ping-ee within the configured timeout. This lets the original node know
// that the indirect ping attempt happened but didn't succeed.
type nackResp struct {
SeqNo uint32
}
// suspect is broadcast when we suspect a node is dead
type suspect struct {
Incarnation uint32
@ -121,7 +136,7 @@ type dead struct {
}
// pushPullHeader is used to inform the
// otherside how many states we are transfering
// otherside how many states we are transferring
type pushPullHeader struct {
Nodes int
UserStateLen int // Encodes the byte lengh of user state
@ -134,7 +149,7 @@ type userMsgHeader struct {
}
// pushNodeState is used for pushPullReq when we are
// transfering out node states
// transferring out node states
type pushNodeState struct {
Name string
Addr []byte
@ -343,6 +358,8 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim
m.handleIndirectPing(buf, from)
case ackRespMsg:
m.handleAck(buf, from, timestamp)
case nackRespMsg:
m.handleNack(buf, from)
case suspectMsg:
fallthrough
@ -440,18 +457,23 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
}
// For proto versions < 2, there is no port provided. Mask old
// behavior by using the configured port
// behavior by using the configured port.
if m.ProtocolVersion() < 2 || ind.Port == 0 {
ind.Port = uint16(m.config.BindPort)
}
// Send a ping to the correct host
// Send a ping to the correct host.
localSeqNo := m.nextSeqNo()
ping := ping{SeqNo: localSeqNo, Node: ind.Node}
destAddr := &net.UDPAddr{IP: ind.Target, Port: int(ind.Port)}
// Setup a response handler to relay the ack
cancelCh := make(chan struct{})
respHandler := func(payload []byte, timestamp time.Time) {
// Try to prevent the nack if we've caught it in time.
close(cancelCh)
// Forward the ack back to the requestor.
ack := ackResp{ind.SeqNo, nil}
if err := m.encodeAndSendMsg(from, ackRespMsg, &ack); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to forward ack: %s %s", err, LogAddress(from))
@ -459,10 +481,25 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
}
m.setAckHandler(localSeqNo, respHandler, m.config.ProbeTimeout)
// Send the ping
// Send the ping.
if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s %s", err, LogAddress(from))
}
// Setup a timer to fire off a nack if no ack is seen in time.
if ind.Nack {
go func() {
select {
case <-cancelCh:
return
case <-time.After(m.config.ProbeTimeout):
nack := nackResp{ind.SeqNo}
if err := m.encodeAndSendMsg(from, nackRespMsg, &nack); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send nack: %s %s", err, LogAddress(from))
}
}
}()
}
}
func (m *Memberlist) handleAck(buf []byte, from net.Addr, timestamp time.Time) {
@ -474,6 +511,15 @@ func (m *Memberlist) handleAck(buf []byte, from net.Addr, timestamp time.Time) {
m.invokeAckHandler(ack, timestamp)
}
func (m *Memberlist) handleNack(buf []byte, from net.Addr) {
var nack nackResp
if err := decode(buf, &nack); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to decode nack response: %s %s", err, LogAddress(from))
return
}
m.invokeNackHandler(nack)
}
func (m *Memberlist) handleSuspect(buf []byte, from net.Addr) {
var sus suspect
if err := decode(buf, &sus); err != nil {

View File

@ -42,9 +42,10 @@ type nodeState struct {
StateChange time.Time // Time last state change happened
}
// ackHandler is used to register handlers for incoming acks
// ackHandler is used to register handlers for incoming acks and nacks.
type ackHandler struct {
handler func([]byte, time.Time)
ackFn func([]byte, time.Time)
nackFn func()
timer *time.Timer
}
@ -148,7 +149,7 @@ func (m *Memberlist) pushPullTrigger(stop <-chan struct{}) {
}
}
// Deschedule is used to stop the background maintenence. This is safe
// Deschedule is used to stop the background maintenance. This is safe
// to call multiple times.
func (m *Memberlist) deschedule() {
m.tickerLock.Lock()
@ -219,24 +220,68 @@ START:
func (m *Memberlist) probeNode(node *nodeState) {
defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now())
// We use our health awareness to scale the overall probe interval, so we
// slow down if we detect problems. The ticker that calls us can handle
// us running over the base interval, and will skip missed ticks.
probeInterval := m.awareness.ScaleTimeout(m.config.ProbeInterval)
if probeInterval > m.config.ProbeInterval {
metrics.IncrCounter([]string{"memberlist", "degraded", "probe"}, 1)
}
// Prepare a ping message and setup an ack handler.
ping := ping{SeqNo: m.nextSeqNo(), Node: node.Name}
ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
m.setAckChannel(ping.SeqNo, ackCh, m.config.ProbeInterval)
nackCh := make(chan struct{}, m.config.IndirectChecks+1)
m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval)
// Send a ping to the node.
deadline := time.Now().Add(m.config.ProbeInterval)
// Send a ping to the node. If this node looks like it's suspect or dead,
// also tack on a suspect message so that it has a chance to refute as
// soon as possible.
deadline := time.Now().Add(probeInterval)
destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)}
if node.State == stateAlive {
if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
return
}
} else {
var msgs [][]byte
if buf, err := encode(pingMsg, &ping); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to encode ping message: %s", err)
return
} else {
msgs = append(msgs, buf.Bytes())
}
s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
if buf, err := encode(suspectMsg, &s); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to encode suspect message: %s", err)
return
} else {
msgs = append(msgs, buf.Bytes())
}
compound := makeCompoundMessage(msgs)
if err := m.rawSendMsgUDP(destAddr, compound.Bytes()); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", destAddr, err)
return
}
}
// Mark the sent time here, which should be after any pre-processing and
// system calls to do the actual send. This probably under-reports a bit,
// but it's the best we can do.
sent := time.Now()
// Arrange for our self-awareness to get updated. At this point we've
// sent the ping, so any return statement means the probe succeeded
// which will improve our health until we get to the failure scenarios
// at the end of this function, which will alter this delta variable
// accordingly.
awarenessDelta := -1
defer func() {
m.awareness.ApplyDelta(awarenessDelta)
}()
// Wait for response or round-trip-time.
select {
case v := <-ackCh:
@ -254,6 +299,12 @@ func (m *Memberlist) probeNode(node *nodeState) {
ackCh <- v
}
case <-time.After(m.config.ProbeTimeout):
// Note that we don't scale this timeout based on awareness and
// the health score. That's because we don't really expect waiting
// longer to help get UDP through. Since health does extend the
// probe interval it will give the TCP fallback more time, which
// is more active in dealing with lost packets, and it gives more
// time to wait for indirect acks/nacks.
m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %v (timeout reached)", node.Name)
}
@ -264,8 +315,15 @@ func (m *Memberlist) probeNode(node *nodeState) {
m.nodeLock.RUnlock()
// Attempt an indirect ping.
expectedNacks := 0
ind := indirectPingReq{SeqNo: ping.SeqNo, Target: node.Addr, Port: node.Port, Node: node.Name}
for _, peer := range kNodes {
// We only expect nack to be sent from peers who understand
// version 4 of the protocol.
if ind.Nack = peer.PMax >= 4; ind.Nack {
expectedNacks++
}
destAddr := &net.UDPAddr{IP: peer.Addr, Port: int(peer.Port)}
if err := m.encodeAndSendMsg(destAddr, indirectPingMsg, &ind); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err)
@ -319,7 +377,23 @@ func (m *Memberlist) probeNode(node *nodeState) {
}
}
// No acks received from target, suspect
// Update our self-awareness based on the results of this failed probe.
// If we don't have peers who will send nacks then we penalize for any
// failed probe as a simple health metric. If we do have peers to nack
// verify, then we can use that as a more sophisticated measure of self-
// health because we assume them to be working, and they can help us
// decide if the probed node was really dead or if it was something wrong
// with ourselves.
awarenessDelta = 0
if expectedNacks > 0 {
if nackCount := len(nackCh); nackCount < expectedNacks {
awarenessDelta += 2 * (expectedNacks - nackCount)
}
} else {
awarenessDelta += 1
}
// No acks received from target, suspect it as failed.
m.logger.Printf("[INFO] memberlist: Suspect %s has failed, no acks received", node.Name)
s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
m.suspectNode(&s)
@ -330,7 +404,7 @@ func (m *Memberlist) Ping(node string, addr net.Addr) (time.Duration, error) {
// Prepare a ping message and setup an ack handler.
ping := ping{SeqNo: m.nextSeqNo(), Node: node}
ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
m.setAckChannel(ping.SeqNo, ackCh, m.config.ProbeInterval)
m.setProbeChannels(ping.SeqNo, ackCh, nil, m.config.ProbeInterval)
// Send a ping to the node.
if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
@ -584,6 +658,11 @@ func (m *Memberlist) nextIncarnation() uint32 {
return atomic.AddUint32(&m.incarnation, 1)
}
// skipIncarnation adds the positive offset to the incarnation number.
func (m *Memberlist) skipIncarnation(offset uint32) uint32 {
return atomic.AddUint32(&m.incarnation, offset)
}
// estNumNodes is used to get the current estimate of the number of nodes
func (m *Memberlist) estNumNodes() int {
return int(atomic.LoadUint32(&m.numNodes))
@ -595,19 +674,27 @@ type ackMessage struct {
Timestamp time.Time
}
// setAckChannel is used to attach a channel to receive a message when an ack with a given
// sequence number is received. The `complete` field of the message will be false on timeout
func (m *Memberlist) setAckChannel(seqNo uint32, ch chan ackMessage, timeout time.Duration) {
// Create a handler function
handler := func(payload []byte, timestamp time.Time) {
// setProbeChannels is used to attach the ackCh to receive a message when an ack
// with a given sequence number is received. The `complete` field of the message
// will be false on timeout. Any nack messages will cause an empty struct to be
// passed to the nackCh, which can be nil if not needed.
func (m *Memberlist) setProbeChannels(seqNo uint32, ackCh chan ackMessage, nackCh chan struct{}, timeout time.Duration) {
// Create handler functions for acks and nacks
ackFn := func(payload []byte, timestamp time.Time) {
select {
case ch <- ackMessage{true, payload, timestamp}:
case ackCh <- ackMessage{true, payload, timestamp}:
default:
}
}
nackFn := func() {
select {
case nackCh <- struct{}{}:
default:
}
}
// Add the handler
ah := &ackHandler{handler, nil}
// Add the handlers
ah := &ackHandler{ackFn, nackFn, nil}
m.ackLock.Lock()
m.ackHandlers[seqNo] = ah
m.ackLock.Unlock()
@ -618,18 +705,19 @@ func (m *Memberlist) setAckChannel(seqNo uint32, ch chan ackMessage, timeout tim
delete(m.ackHandlers, seqNo)
m.ackLock.Unlock()
select {
case ch <- ackMessage{false, nil, time.Now()}:
case ackCh <- ackMessage{false, nil, time.Now()}:
default:
}
})
}
// setAckHandler is used to attach a handler to be invoked when an
// ack with a given sequence number is received. If a timeout is reached,
// the handler is deleted
func (m *Memberlist) setAckHandler(seqNo uint32, handler func([]byte, time.Time), timeout time.Duration) {
// setAckHandler is used to attach a handler to be invoked when an ack with a
// given sequence number is received. If a timeout is reached, the handler is
// deleted. This is used for indirect pings so does not configure a function
// for nacks.
func (m *Memberlist) setAckHandler(seqNo uint32, ackFn func([]byte, time.Time), timeout time.Duration) {
// Add the handler
ah := &ackHandler{handler, nil}
ah := &ackHandler{ackFn, nil, nil}
m.ackLock.Lock()
m.ackHandlers[seqNo] = ah
m.ackLock.Unlock()
@ -642,7 +730,7 @@ func (m *Memberlist) setAckHandler(seqNo uint32, handler func([]byte, time.Time)
})
}
// Invokes an Ack handler if any is associated, and reaps the handler immediately
// Invokes an ack handler if any is associated, and reaps the handler immediately
func (m *Memberlist) invokeAckHandler(ack ackResp, timestamp time.Time) {
m.ackLock.Lock()
ah, ok := m.ackHandlers[ack.SeqNo]
@ -652,7 +740,49 @@ func (m *Memberlist) invokeAckHandler(ack ackResp, timestamp time.Time) {
return
}
ah.timer.Stop()
ah.handler(ack.Payload, timestamp)
ah.ackFn(ack.Payload, timestamp)
}
// Invokes nack handler if any is associated.
func (m *Memberlist) invokeNackHandler(nack nackResp) {
m.ackLock.Lock()
ah, ok := m.ackHandlers[nack.SeqNo]
m.ackLock.Unlock()
if !ok || ah.nackFn == nil {
return
}
ah.nackFn()
}
// refute gossips an alive message in response to incoming information that we
// are suspect or dead. It will make sure the incarnation number beats the given
// accusedInc value, or you can supply 0 to just get the next incarnation number.
// This alters the node state that's passed in so this MUST be called while the
// nodeLock is held.
func (m *Memberlist) refute(me *nodeState, accusedInc uint32) {
// Make sure the incarnation number beats the accusation.
inc := m.nextIncarnation()
if accusedInc >= inc {
inc = m.skipIncarnation(accusedInc - inc + 1)
}
me.Incarnation = inc
// Decrease our health because we are being asked to refute a problem.
m.awareness.ApplyDelta(1)
// Format and broadcast an alive message.
a := alive{
Incarnation: inc,
Node: me.Name,
Addr: me.Addr,
Port: me.Port,
Meta: me.Meta,
Vsn: []uint8{
me.PMin, me.PMax, me.PCur,
me.DMin, me.DMax, me.DCur,
},
}
m.encodeAndBroadcast(me.Addr.String(), aliveMsg, a)
}
// aliveNode is invoked by the network layer when we get a message about a
@ -754,6 +884,9 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
return
}
// Clear out any suspicion timer that may be in effect.
delete(m.nodeTimers, a.Node)
// Store the old state and meta data
oldState := state.State
oldMeta := state.Meta
@ -783,21 +916,7 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
return
}
inc := m.nextIncarnation()
for a.Incarnation >= inc {
inc = m.nextIncarnation()
}
state.Incarnation = inc
a := alive{
Incarnation: inc,
Node: state.Name,
Addr: state.Addr,
Port: state.Port,
Meta: state.Meta,
Vsn: versions,
}
m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify)
m.refute(state, a.Incarnation)
m.logger.Printf("[WARN] memberlist: Refuting an alive message")
} else {
m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify)
@ -854,6 +973,17 @@ func (m *Memberlist) suspectNode(s *suspect) {
return
}
// See if there's a suspicion timer we can confirm. If the info is new
// to us we will go ahead and re-gossip it. This allows for multiple
// independent confirmations to flow even when a node probes a node
// that's already suspect.
if timer, ok := m.nodeTimers[s.Node]; ok {
if timer.Confirm(s.From) {
m.encodeAndBroadcast(s.Node, suspectMsg, s)
}
return
}
// Ignore non-alive nodes
if state.State != stateAlive {
return
@ -861,24 +991,7 @@ func (m *Memberlist) suspectNode(s *suspect) {
// If this is us we need to refute, otherwise re-broadcast
if state.Name == m.config.Name {
inc := m.nextIncarnation()
for s.Incarnation >= inc {
inc = m.nextIncarnation()
}
state.Incarnation = inc
a := alive{
Incarnation: inc,
Node: state.Name,
Addr: state.Addr,
Port: state.Port,
Meta: state.Meta,
Vsn: []uint8{
state.PMin, state.PMax, state.PCur,
state.DMin, state.DMax, state.DCur,
},
}
m.encodeAndBroadcast(s.Node, aliveMsg, a)
m.refute(state, s.Incarnation)
m.logger.Printf("[WARN] memberlist: Refuting a suspect message (from: %s)", s.From)
return // Do not mark ourself suspect
} else {
@ -894,26 +1007,41 @@ func (m *Memberlist) suspectNode(s *suspect) {
changeTime := time.Now()
state.StateChange = changeTime
// Setup a timeout for this
timeout := suspicionTimeout(m.config.SuspicionMult, m.estNumNodes(), m.config.ProbeInterval)
time.AfterFunc(timeout, func() {
// Setup a suspicion timer. Given that we don't have any known phase
// relationship with our peers, we set up k such that we hit the nominal
// timeout two probe intervals short of what we expect given the suspicion
// multiplier.
k := m.config.SuspicionMult - 2
// If there aren't enough nodes to give the expected confirmations, just
// set k to 0 to say that we don't expect any. Note we subtract 2 from n
// here to take out ourselves and the node being probed.
n := m.estNumNodes()
if n-2 < k {
k = 0
}
// Compute the timeouts based on the size of the cluster.
min := suspicionTimeout(m.config.SuspicionMult, n, m.config.ProbeInterval)
max := time.Duration(m.config.SuspicionMaxTimeoutMult) * min
fn := func(numConfirmations int) {
m.nodeLock.Lock()
state, ok := m.nodeMap[s.Node]
timeout := ok && state.State == stateSuspect && state.StateChange == changeTime
m.nodeLock.Unlock()
if timeout {
m.suspectTimeout(state)
if k > 0 && numConfirmations < k {
metrics.IncrCounter([]string{"memberlist", "degraded", "timeout"}, 1)
}
})
}
// suspectTimeout is invoked when a suspect timeout has occurred
func (m *Memberlist) suspectTimeout(n *nodeState) {
// Construct a dead message
m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached", n.Name)
d := dead{Incarnation: n.Incarnation, Node: n.Name, From: m.config.Name}
m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached (%d peer confirmations)",
state.Name, numConfirmations)
d := dead{Incarnation: state.Incarnation, Node: state.Name, From: m.config.Name}
m.deadNode(&d)
}
}
m.nodeTimers[s.Node] = newSuspicion(s.From, k, min, max, fn)
}
// deadNode is invoked by the network layer when we get a message
@ -933,6 +1061,9 @@ func (m *Memberlist) deadNode(d *dead) {
return
}
// Clear out any suspicion timer that may be in effect.
delete(m.nodeTimers, d.Node)
// Ignore if node is already dead
if state.State == stateDead {
return
@ -942,24 +1073,7 @@ func (m *Memberlist) deadNode(d *dead) {
if state.Name == m.config.Name {
// If we are not leaving we need to refute
if !m.leave {
inc := m.nextIncarnation()
for d.Incarnation >= inc {
inc = m.nextIncarnation()
}
state.Incarnation = inc
a := alive{
Incarnation: inc,
Node: state.Name,
Addr: state.Addr,
Port: state.Port,
Meta: state.Meta,
Vsn: []uint8{
state.PMin, state.PMax, state.PCur,
state.DMin, state.DMax, state.DCur,
},
}
m.encodeAndBroadcast(d.Node, aliveMsg, a)
m.refute(state, d.Incarnation)
m.logger.Printf("[WARN] memberlist: Refuting a dead message (from: %s)", d.From)
return // Do not mark ourself dead
}
@ -1001,7 +1115,7 @@ func (m *Memberlist) mergeState(remote []pushNodeState) {
m.aliveNode(&a, nil, false)
case stateDead:
// If the remote node belives a node is dead, we prefer to
// If the remote node believes a node is dead, we prefer to
// suspect that node instead of declaring it dead instantly
fallthrough
case stateSuspect:

130
vendor/github.com/hashicorp/memberlist/suspicion.go generated vendored Normal file
View File

@ -0,0 +1,130 @@
package memberlist
import (
"math"
"sync/atomic"
"time"
)
// suspicion manages the suspect timer for a node and provides an interface
// to accelerate the timeout as we get more independent confirmations that
// a node is suspect.
type suspicion struct {
// n is the number of independent confirmations we've seen. This must
// be updated using atomic instructions to prevent contention with the
// timer callback.
n int32
// k is the number of independent confirmations we'd like to see in
// order to drive the timer to its minimum value.
k int32
// min is the minimum timer value.
min time.Duration
// max is the maximum timer value.
max time.Duration
// start captures the timestamp when we began the timer. This is used
// so we can calculate durations to feed the timer during updates in
// a way the achieves the overall time we'd like.
start time.Time
// timer is the underlying timer that implements the timeout.
timer *time.Timer
// f is the function to call when the timer expires. We hold on to this
// because there are cases where we call it directly.
timeoutFn func()
// confirmations is a map of "from" nodes that have confirmed a given
// node is suspect. This prevents double counting.
confirmations map[string]struct{}
}
// newSuspicion returns a timer started with the max time, and that will drive
// to the min time after seeing k or more confirmations. The from node will be
// excluded from confirmations since we might get our own suspicion message
// gossiped back to us. The minimum time will be used if no confirmations are
// called for (k <= 0).
func newSuspicion(from string, k int, min time.Duration, max time.Duration, fn func(int)) *suspicion {
s := &suspicion{
k: int32(k),
min: min,
max: max,
confirmations: make(map[string]struct{}),
}
// Exclude the from node from any confirmations.
s.confirmations[from] = struct{}{}
// Pass the number of confirmations into the timeout function for
// easy telemetry.
s.timeoutFn = func() {
fn(int(atomic.LoadInt32(&s.n)))
}
// If there aren't any confirmations to be made then take the min
// time from the start.
timeout := max
if k < 1 {
timeout = min
}
s.timer = time.AfterFunc(timeout, s.timeoutFn)
// Capture the start time right after starting the timer above so
// we should always err on the side of a little longer timeout if
// there's any preemption that separates this and the step above.
s.start = time.Now()
return s
}
// remainingSuspicionTime takes the state variables of the suspicion timer and
// calculates the remaining time to wait before considering a node dead. The
// return value can be negative, so be prepared to fire the timer immediately in
// that case.
func remainingSuspicionTime(n, k int32, elapsed time.Duration, min, max time.Duration) time.Duration {
frac := math.Log(float64(n)+1.0) / math.Log(float64(k)+1.0)
raw := max.Seconds() - frac*(max.Seconds()-min.Seconds())
timeout := time.Duration(math.Floor(1000.0*raw)) * time.Millisecond
if timeout < min {
timeout = min
}
// We have to take into account the amount of time that has passed so
// far, so we get the right overall timeout.
return timeout - elapsed
}
// Confirm registers that a possibly new peer has also determined the given
// node is suspect. This returns true if this was new information, and false
// if it was a duplicate confirmation, or if we've got enough confirmations to
// hit the minimum.
func (s *suspicion) Confirm(from string) bool {
// If we've got enough confirmations then stop accepting them.
if atomic.LoadInt32(&s.n) >= s.k {
return false
}
// Only allow one confirmation from each possible peer.
if _, ok := s.confirmations[from]; ok {
return false
}
s.confirmations[from] = struct{}{}
// Compute the new timeout given the current number of confirmations and
// adjust the timer. If the timeout becomes negative *and* we can cleanly
// stop the timer then we will call the timeout function directly from
// here.
n := atomic.AddInt32(&s.n, 1)
elapsed := time.Now().Sub(s.start)
remaining := remainingSuspicionTime(n, s.k, elapsed, s.min, s.max)
if s.timer.Stop() {
if remaining > 0 {
s.timer.Reset(remaining)
} else {
go s.timeoutFn()
}
}
return true
}

View File

@ -9,6 +9,7 @@ import (
"math"
"math/rand"
"net"
"strings"
"time"
"github.com/hashicorp/go-msgpack/codec"
@ -326,6 +327,12 @@ func isLoopbackIP(ip_str string) bool {
return loopbackBlock.Contains(ip)
}
// Given a string of the form "host", "host:port", or "[ipv6::address]:port",
// return true if the string includes a port.
func hasPort(s string) bool {
return strings.LastIndex(s, ":") > strings.LastIndex(s, "]")
}
// compressPayload takes an opaque input buffer, compresses it
// and wraps it in a compress{} message that is encoded.
func compressPayload(inp []byte) (*bytes.Buffer, error) {