From 785cb70cd35974c3556067130f70ffa66cf6b634 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 7 Jun 2016 14:30:47 -0700 Subject: [PATCH] Updates to latest Serf/memberlist to get lifeguard and TCP joins over DNS. --- Godeps/Godeps.json | 10 +- .../github.com/hashicorp/memberlist/README.md | 2 +- .../hashicorp/memberlist/awareness.go | 69 ++++ .../github.com/hashicorp/memberlist/config.go | 59 +++- .../hashicorp/memberlist/delegate.go | 3 +- .../hashicorp/memberlist/memberlist.go | 183 ++++++++--- vendor/github.com/hashicorp/memberlist/net.go | 58 +++- .../github.com/hashicorp/memberlist/state.go | 300 ++++++++++++------ .../hashicorp/memberlist/suspicion.go | 130 ++++++++ .../github.com/hashicorp/memberlist/util.go | 7 + 10 files changed, 656 insertions(+), 165 deletions(-) create mode 100644 vendor/github.com/hashicorp/memberlist/awareness.go create mode 100644 vendor/github.com/hashicorp/memberlist/suspicion.go diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 1bf83dcf5e..167286e239 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -206,7 +206,7 @@ }, { "ImportPath": "github.com/hashicorp/memberlist", - "Rev": "cef12ad58224d55cf26caa9e3d239c2fcb3432a2" + "Rev": "215aec831f03c9b7c61ac183d3e28fff3c7d3a37" }, { "ImportPath": "github.com/hashicorp/net-rpc-msgpackrpc", @@ -226,13 +226,13 @@ }, { "ImportPath": "github.com/hashicorp/serf/coordinate", - "Comment": "v0.7.0-62-gb60a6d9", - "Rev": "b60a6d928fe726a588f79a1d500582507f9d79de" + "Comment": "v0.7.0-64-gdce30f1", + "Rev": "dce30f1c7806bf2d96478abb983c53af0e4c8fb2" }, { "ImportPath": "github.com/hashicorp/serf/serf", - "Comment": "v0.7.0-62-gb60a6d9", - "Rev": "b60a6d928fe726a588f79a1d500582507f9d79de" + "Comment": "v0.7.0-64-gdce30f1", + "Rev": "dce30f1c7806bf2d96478abb983c53af0e4c8fb2" }, { "ImportPath": "github.com/hashicorp/yamux", diff --git a/vendor/github.com/hashicorp/memberlist/README.md b/vendor/github.com/hashicorp/memberlist/README.md index c8a125f2ca..fc605a59b4 100644 --- a/vendor/github.com/hashicorp/memberlist/README.md +++ b/vendor/github.com/hashicorp/memberlist/README.md @@ -82,7 +82,7 @@ least one existing member in order to join the cluster. The new member does a full state sync with the existing member over TCP and begins gossiping its existence to the cluster. -Gossip is done over UDP to a with a configurable but fixed fanout and interval. +Gossip is done over UDP with a configurable but fixed fanout and interval. This ensures that network usage is constant with regards to number of nodes, as opposed to exponential growth that can occur with traditional heartbeat mechanisms. Complete state exchanges with a random node are done periodically over diff --git a/vendor/github.com/hashicorp/memberlist/awareness.go b/vendor/github.com/hashicorp/memberlist/awareness.go new file mode 100644 index 0000000000..ea95c75388 --- /dev/null +++ b/vendor/github.com/hashicorp/memberlist/awareness.go @@ -0,0 +1,69 @@ +package memberlist + +import ( + "sync" + "time" + + "github.com/armon/go-metrics" +) + +// awareness manages a simple metric for tracking the estimated health of the +// local node. Health is primary the node's ability to respond in the soft +// real-time manner required for correct health checking of other nodes in the +// cluster. +type awareness struct { + sync.RWMutex + + // max is the upper threshold for the timeout scale (the score will be + // constrained to be from 0 <= score < max). + max int + + // score is the current awareness score. Lower values are healthier and + // zero is the minimum value. + score int +} + +// newAwareness returns a new awareness object. +func newAwareness(max int) *awareness { + return &awareness{ + max: max, + score: 0, + } +} + +// ApplyDelta takes the given delta and applies it to the score in a thread-safe +// manner. It also enforces a floor of zero and a max of max, so deltas may not +// change the overall score if it's railed at one of the extremes. +func (a *awareness) ApplyDelta(delta int) { + a.Lock() + initial := a.score + a.score += delta + if a.score < 0 { + a.score = 0 + } else if a.score > (a.max - 1) { + a.score = (a.max - 1) + } + final := a.score + a.Unlock() + + if initial != final { + metrics.SetGauge([]string{"memberlist", "health", "score"}, float32(final)) + } +} + +// GetHealthScore returns the raw health score. +func (a *awareness) GetHealthScore() int { + a.RLock() + score := a.score + a.RUnlock() + return score +} + +// ScaleTimeout takes the given duration and scales it based on the current +// score. Less healthyness will lead to longer timeouts. +func (a *awareness) ScaleTimeout(timeout time.Duration) time.Duration { + a.RLock() + score := a.score + a.RUnlock() + return timeout * (time.Duration(score) + 1) +} diff --git a/vendor/github.com/hashicorp/memberlist/config.go b/vendor/github.com/hashicorp/memberlist/config.go index 0efd321b21..85a93f4228 100644 --- a/vendor/github.com/hashicorp/memberlist/config.go +++ b/vendor/github.com/hashicorp/memberlist/config.go @@ -63,6 +63,23 @@ type Config struct { // still alive. SuspicionMult int + // SuspicionMaxTimeoutMult is the multiplier applied to the + // SuspicionTimeout used as an upper bound on detection time. This max + // timeout is calculated using the formula: + // + // SuspicionMaxTimeout = SuspicionMaxTimeoutMult * SuspicionTimeout + // + // If everything is working properly, confirmations from other nodes will + // accelerate suspicion timers in a manner which will cause the timeout + // to reach the base SuspicionTimeout before that elapses, so this value + // will typically only come into play if a node is experiencing issues + // communicating with other nodes. It should be set to a something fairly + // large so that a node having problems will have a lot of chances to + // recover before falsely declaring other nodes as failed, but short + // enough for a legitimately isolated node to still make progress marking + // nodes failed in a reasonable amount of time. + SuspicionMaxTimeoutMult int + // PushPullInterval is the interval between complete state syncs. // Complete state syncs are done with a single node over TCP and are // quite expensive relative to standard gossiped messages. Setting this @@ -91,6 +108,11 @@ type Config struct { // indirect UDP pings. DisableTcpPings bool + // AwarenessMaxMultiplier will increase the probe interval if the node + // becomes aware that it might be degraded and not meeting the soft real + // time requirements to reliably probe other nodes. + AwarenessMaxMultiplier int + // GossipInterval and GossipNodes are used to configure the gossip // behavior of memberlist. // @@ -143,6 +165,10 @@ type Config struct { Ping PingDelegate Alive AliveDelegate + // DNSConfigPath points to the system's DNS config file, usually located + // at /etc/resolv.conf. It can be overridden via config for easier testing. + DNSConfigPath string + // LogOutput is the writer where logs should be sent. If this is not // set, logging will go to stderr by default. You cannot specify both LogOutput // and Logger at the same time. @@ -164,20 +190,22 @@ type Config struct { func DefaultLANConfig() *Config { hostname, _ := os.Hostname() return &Config{ - Name: hostname, - BindAddr: "0.0.0.0", - BindPort: 7946, - AdvertiseAddr: "", - AdvertisePort: 7946, - ProtocolVersion: ProtocolVersion2Compatible, - TCPTimeout: 10 * time.Second, // Timeout after 10 seconds - IndirectChecks: 3, // Use 3 nodes for the indirect ping - RetransmitMult: 4, // Retransmit a message 4 * log(N+1) nodes - SuspicionMult: 5, // Suspect a node for 5 * log(N+1) * Interval - PushPullInterval: 30 * time.Second, // Low frequency - ProbeTimeout: 500 * time.Millisecond, // Reasonable RTT time for LAN - ProbeInterval: 1 * time.Second, // Failure check every second - DisableTcpPings: false, // TCP pings are safe, even with mixed versions + Name: hostname, + BindAddr: "0.0.0.0", + BindPort: 7946, + AdvertiseAddr: "", + AdvertisePort: 7946, + ProtocolVersion: ProtocolVersion2Compatible, + TCPTimeout: 10 * time.Second, // Timeout after 10 seconds + IndirectChecks: 3, // Use 3 nodes for the indirect ping + RetransmitMult: 4, // Retransmit a message 4 * log(N+1) nodes + SuspicionMult: 5, // Suspect a node for 5 * log(N+1) * Interval + SuspicionMaxTimeoutMult: 6, // For 10k nodes this will give a max timeout of 120 seconds + PushPullInterval: 30 * time.Second, // Low frequency + ProbeTimeout: 500 * time.Millisecond, // Reasonable RTT time for LAN + ProbeInterval: 1 * time.Second, // Failure check every second + DisableTcpPings: false, // TCP pings are safe, even with mixed versions + AwarenessMaxMultiplier: 8, // Probe interval backs off to 8 seconds GossipNodes: 3, // Gossip to 3 nodes GossipInterval: 200 * time.Millisecond, // Gossip more rapidly @@ -185,8 +213,9 @@ func DefaultLANConfig() *Config { EnableCompression: true, // Enable compression by default SecretKey: nil, + Keyring: nil, - Keyring: nil, + DNSConfigPath: "/etc/resolv.conf", } } diff --git a/vendor/github.com/hashicorp/memberlist/delegate.go b/vendor/github.com/hashicorp/memberlist/delegate.go index b1204a41e8..66aa2da796 100644 --- a/vendor/github.com/hashicorp/memberlist/delegate.go +++ b/vendor/github.com/hashicorp/memberlist/delegate.go @@ -19,7 +19,8 @@ type Delegate interface { // It can return a list of buffers to send. Each buffer should assume an // overhead as provided with a limit on the total byte size allowed. // The total byte size of the resulting data to send must not exceed - // the limit. + // the limit. Care should be taken that this method does not block, + // since doing so would block the entire UDP packet receive loop. GetBroadcasts(overhead, limit int) [][]byte // LocalState is used for a TCP Push/Pull. This is sent to diff --git a/vendor/github.com/hashicorp/memberlist/memberlist.go b/vendor/github.com/hashicorp/memberlist/memberlist.go index 38f971df20..7d7e563ef0 100644 --- a/vendor/github.com/hashicorp/memberlist/memberlist.go +++ b/vendor/github.com/hashicorp/memberlist/memberlist.go @@ -20,8 +20,12 @@ import ( "net" "os" "strconv" + "strings" "sync" "time" + + "github.com/hashicorp/go-multierror" + "github.com/miekg/dns" ) type Memberlist struct { @@ -39,9 +43,11 @@ type Memberlist struct { tcpListener *net.TCPListener handoff chan msgHandoff - nodeLock sync.RWMutex - nodes []*nodeState // Known nodes - nodeMap map[string]*nodeState // Maps Addr.String() -> NodeState + nodeLock sync.RWMutex + nodes []*nodeState // Known nodes + nodeMap map[string]*nodeState // Maps Addr.String() -> NodeState + nodeTimers map[string]*suspicion // Maps Addr.String() -> suspicion timer + awareness *awareness tickerLock sync.Mutex tickers []*time.Ticker @@ -57,7 +63,7 @@ type Memberlist struct { } // newMemberlist creates the network listeners. -// Does not schedule execution of background maintenence. +// Does not schedule execution of background maintenance. func newMemberlist(conf *Config) (*Memberlist, error) { if conf.ProtocolVersion < ProtocolVersionMin { return nil, fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]", @@ -125,6 +131,8 @@ func newMemberlist(conf *Config) (*Memberlist, error) { tcpListener: tcpLn, handoff: make(chan msgHandoff, 1024), nodeMap: make(map[string]*nodeState), + nodeTimers: make(map[string]*suspicion), + awareness: newAwareness(conf.AwarenessMaxMultiplier), ackHandlers: make(map[uint32]*ackHandler), broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult}, logger: logger, @@ -166,72 +174,152 @@ func Create(conf *Config) (*Memberlist, error) { // none could be reached. If an error is returned, the node did not successfully // join the cluster. func (m *Memberlist) Join(existing []string) (int, error) { - // Attempt to join any of them numSuccess := 0 - var retErr error + var errs error for _, exist := range existing { - addrs, port, err := m.resolveAddr(exist) + addrs, err := m.resolveAddr(exist) if err != nil { - m.logger.Printf("[WARN] memberlist: Failed to resolve %s: %v", exist, err) - retErr = err + err = fmt.Errorf("Failed to resolve %s: %v", exist, err) + errs = multierror.Append(errs, err) + m.logger.Printf("[WARN] memberlist: %v", err) continue } for _, addr := range addrs { - if err := m.pushPullNode(addr, port, true); err != nil { - retErr = err + if err := m.pushPullNode(addr.ip, addr.port, true); err != nil { + err = fmt.Errorf("Failed to join %s: %v", addr.ip, err) + errs = multierror.Append(errs, err) + m.logger.Printf("[DEBUG] memberlist: %v", err) continue } numSuccess++ } } - if numSuccess > 0 { - retErr = nil + errs = nil + } + return numSuccess, errs +} + +// ipPort holds information about a node we want to try to join. +type ipPort struct { + ip net.IP + port uint16 +} + +// tcpLookupIP is a helper to initiate a TCP-based DNS lookup for the given host. +// The built-in Go resolver will do a UDP lookup first, and will only use TCP if +// the response has the truncate bit set, which isn't common on DNS servers like +// Consul's. By doing the TCP lookup directly, we get the best chance for the +// largest list of hosts to join. Since joins are relatively rare events, it's ok +// to do this rather expensive operation. +func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16) ([]ipPort, error) { + // Don't attempt any TCP lookups against non-fully qualified domain + // names, since those will likely come from the resolv.conf file. + if !strings.Contains(host, ".") { + return nil, nil } - return numSuccess, retErr + // Make sure the domain name is terminated with a dot (we know there's + // at least one character at this point). + dn := host + if dn[len(dn)-1] != '.' { + dn = dn + "." + } + + // See if we can find a server to try. + cc, err := dns.ClientConfigFromFile(m.config.DNSConfigPath) + if err != nil { + return nil, err + } + if len(cc.Servers) > 0 { + // We support host:port in the DNS config, but need to add the + // default port if one is not supplied. + server := cc.Servers[0] + if !hasPort(server) { + server = net.JoinHostPort(server, cc.Port) + } + + // Do the lookup. + c := new(dns.Client) + c.Net = "tcp" + msg := new(dns.Msg) + msg.SetQuestion(dn, dns.TypeANY) + in, _, err := c.Exchange(msg, server) + if err != nil { + return nil, err + } + + // Handle any IPs we get back that we can attempt to join. + var ips []ipPort + for _, r := range in.Answer { + switch rr := r.(type) { + case (*dns.A): + ips = append(ips, ipPort{rr.A, defaultPort}) + case (*dns.AAAA): + ips = append(ips, ipPort{rr.AAAA, defaultPort}) + case (*dns.CNAME): + m.logger.Printf("[DEBUG] memberlist: Ignoring CNAME RR in TCP-first answer for '%s'", host) + } + } + return ips, nil + } + + return nil, nil } // resolveAddr is used to resolve the address into an address, // port, and error. If no port is given, use the default -func (m *Memberlist) resolveAddr(hostStr string) ([][]byte, uint16, error) { - ips := make([][]byte, 0) +func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) { + // Normalize the incoming string to host:port so we can apply Go's + // parser to it. port := uint16(0) + if !hasPort(hostStr) { + hostStr += ":" + strconv.Itoa(m.config.BindPort) + } host, sport, err := net.SplitHostPort(hostStr) - if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { - // error, port missing - we can solve this - port = uint16(m.config.BindPort) - host = hostStr - } else if err != nil { - // error, but not missing port - return ips, port, err - } else if lport, err := strconv.ParseUint(sport, 10, 16); err != nil { - // error, when parsing port - return ips, port, err - } else { - // no error - port = uint16(lport) + if err != nil { + return nil, err } - // Get the addresses that hostPort might resolve to - // ResolveTcpAddr requres ipv6 brackets to separate - // port numbers whereas ParseIP doesn't, but luckily - // SplitHostPort takes care of the brackets - if ip := net.ParseIP(host); ip == nil { - if pre, err := net.LookupIP(host); err == nil { - for _, ip := range pre { - ips = append(ips, ip) - } - } else { - return ips, port, err - } - } else { - ips = append(ips, ip) + // This will capture the supplied port, or the default one added above. + lport, err := strconv.ParseUint(sport, 10, 16) + if err != nil { + return nil, err + } + port = uint16(lport) + + // If it looks like an IP address we are done. The SplitHostPort() above + // will make sure the host part is in good shape for parsing, even for + // IPv6 addresses. + if ip := net.ParseIP(host); ip != nil { + return []ipPort{ipPort{ip, port}}, nil } - return ips, port, nil + // First try TCP so we have the best chance for the largest list of + // hosts to join. If this fails it's not fatal since this isn't a standard + // way to query DNS, and we have a fallback below. + ips, err := m.tcpLookupIP(host, port) + if err != nil { + m.logger.Printf("[DEBUG] memberlist: TCP-first lookup failed for '%s', falling back to UDP: %s", hostStr, err) + } + if len(ips) > 0 { + return ips, nil + } + + // If TCP didn't yield anything then use the normal Go resolver which + // will try UDP, then might possibly try TCP again if the UDP response + // indicates it was truncated. + ans, err := net.LookupIP(host) + if err != nil { + return nil, err + } + ips = make([]ipPort, 0, len(ans)) + for _, ip := range ans { + ips = append(ips, ipPort{ip, port}) + } + return ips, nil } // setAlive is used to mark this node as being alive. This is the same @@ -541,6 +629,13 @@ func (m *Memberlist) anyAlive() bool { return false } +// GetHealthScore gives this instance's idea of how well it is meeting the soft +// real-time requirements of the protocol. Lower numbers are better, and zero +// means "totally healthy". +func (m *Memberlist) GetHealthScore() int { + return m.awareness.GetHealthScore() +} + // ProtocolVersion returns the protocol version currently in use by // this memberlist. func (m *Memberlist) ProtocolVersion() uint8 { diff --git a/vendor/github.com/hashicorp/memberlist/net.go b/vendor/github.com/hashicorp/memberlist/net.go index b92dccb101..63cb6eb585 100644 --- a/vendor/github.com/hashicorp/memberlist/net.go +++ b/vendor/github.com/hashicorp/memberlist/net.go @@ -24,9 +24,15 @@ const ( // A memberlist speaking version 2 of the protocol will attempt // to TCP ping another memberlist who understands version 3 or // greater. + // + // Version 4 added support for nacks as part of indirect probes. + // A memberlist speaking version 2 of the protocol will expect + // nacks from another memberlist who understands version 4 or + // greater, and likewise nacks will be sent to memberlists who + // understand version 4 or greater. ProtocolVersion2Compatible = 2 - ProtocolVersionMax = 3 + ProtocolVersionMax = 4 ) // messageType is an integer ID of a type of message that can be received @@ -46,6 +52,7 @@ const ( userMsg // User mesg, not handled by us compressMsg encryptMsg + nackRespMsg ) // compressionType is used to specify the compression algorithm @@ -83,6 +90,7 @@ type indirectPingReq struct { Target []byte Port uint16 Node string + Nack bool // true if we'd like a nack back } // ack response is sent for a ping @@ -91,6 +99,13 @@ type ackResp struct { Payload []byte } +// nack response is sent for an indirect ping when the pinger doesn't hear from +// the ping-ee within the configured timeout. This lets the original node know +// that the indirect ping attempt happened but didn't succeed. +type nackResp struct { + SeqNo uint32 +} + // suspect is broadcast when we suspect a node is dead type suspect struct { Incarnation uint32 @@ -121,7 +136,7 @@ type dead struct { } // pushPullHeader is used to inform the -// otherside how many states we are transfering +// otherside how many states we are transferring type pushPullHeader struct { Nodes int UserStateLen int // Encodes the byte lengh of user state @@ -134,7 +149,7 @@ type userMsgHeader struct { } // pushNodeState is used for pushPullReq when we are -// transfering out node states +// transferring out node states type pushNodeState struct { Name string Addr []byte @@ -343,6 +358,8 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim m.handleIndirectPing(buf, from) case ackRespMsg: m.handleAck(buf, from, timestamp) + case nackRespMsg: + m.handleNack(buf, from) case suspectMsg: fallthrough @@ -440,18 +457,23 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) { } // For proto versions < 2, there is no port provided. Mask old - // behavior by using the configured port + // behavior by using the configured port. if m.ProtocolVersion() < 2 || ind.Port == 0 { ind.Port = uint16(m.config.BindPort) } - // Send a ping to the correct host + // Send a ping to the correct host. localSeqNo := m.nextSeqNo() ping := ping{SeqNo: localSeqNo, Node: ind.Node} destAddr := &net.UDPAddr{IP: ind.Target, Port: int(ind.Port)} // Setup a response handler to relay the ack + cancelCh := make(chan struct{}) respHandler := func(payload []byte, timestamp time.Time) { + // Try to prevent the nack if we've caught it in time. + close(cancelCh) + + // Forward the ack back to the requestor. ack := ackResp{ind.SeqNo, nil} if err := m.encodeAndSendMsg(from, ackRespMsg, &ack); err != nil { m.logger.Printf("[ERR] memberlist: Failed to forward ack: %s %s", err, LogAddress(from)) @@ -459,10 +481,25 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) { } m.setAckHandler(localSeqNo, respHandler, m.config.ProbeTimeout) - // Send the ping + // Send the ping. if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil { m.logger.Printf("[ERR] memberlist: Failed to send ping: %s %s", err, LogAddress(from)) } + + // Setup a timer to fire off a nack if no ack is seen in time. + if ind.Nack { + go func() { + select { + case <-cancelCh: + return + case <-time.After(m.config.ProbeTimeout): + nack := nackResp{ind.SeqNo} + if err := m.encodeAndSendMsg(from, nackRespMsg, &nack); err != nil { + m.logger.Printf("[ERR] memberlist: Failed to send nack: %s %s", err, LogAddress(from)) + } + } + }() + } } func (m *Memberlist) handleAck(buf []byte, from net.Addr, timestamp time.Time) { @@ -474,6 +511,15 @@ func (m *Memberlist) handleAck(buf []byte, from net.Addr, timestamp time.Time) { m.invokeAckHandler(ack, timestamp) } +func (m *Memberlist) handleNack(buf []byte, from net.Addr) { + var nack nackResp + if err := decode(buf, &nack); err != nil { + m.logger.Printf("[ERR] memberlist: Failed to decode nack response: %s %s", err, LogAddress(from)) + return + } + m.invokeNackHandler(nack) +} + func (m *Memberlist) handleSuspect(buf []byte, from net.Addr) { var sus suspect if err := decode(buf, &sus); err != nil { diff --git a/vendor/github.com/hashicorp/memberlist/state.go b/vendor/github.com/hashicorp/memberlist/state.go index d0339bd158..77871299d7 100644 --- a/vendor/github.com/hashicorp/memberlist/state.go +++ b/vendor/github.com/hashicorp/memberlist/state.go @@ -42,10 +42,11 @@ type nodeState struct { StateChange time.Time // Time last state change happened } -// ackHandler is used to register handlers for incoming acks +// ackHandler is used to register handlers for incoming acks and nacks. type ackHandler struct { - handler func([]byte, time.Time) - timer *time.Timer + ackFn func([]byte, time.Time) + nackFn func() + timer *time.Timer } // NoPingResponseError is used to indicate a 'ping' packet was @@ -148,7 +149,7 @@ func (m *Memberlist) pushPullTrigger(stop <-chan struct{}) { } } -// Deschedule is used to stop the background maintenence. This is safe +// Deschedule is used to stop the background maintenance. This is safe // to call multiple times. func (m *Memberlist) deschedule() { m.tickerLock.Lock() @@ -219,17 +220,51 @@ START: func (m *Memberlist) probeNode(node *nodeState) { defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now()) + // We use our health awareness to scale the overall probe interval, so we + // slow down if we detect problems. The ticker that calls us can handle + // us running over the base interval, and will skip missed ticks. + probeInterval := m.awareness.ScaleTimeout(m.config.ProbeInterval) + if probeInterval > m.config.ProbeInterval { + metrics.IncrCounter([]string{"memberlist", "degraded", "probe"}, 1) + } + // Prepare a ping message and setup an ack handler. ping := ping{SeqNo: m.nextSeqNo(), Node: node.Name} ackCh := make(chan ackMessage, m.config.IndirectChecks+1) - m.setAckChannel(ping.SeqNo, ackCh, m.config.ProbeInterval) + nackCh := make(chan struct{}, m.config.IndirectChecks+1) + m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval) - // Send a ping to the node. - deadline := time.Now().Add(m.config.ProbeInterval) + // Send a ping to the node. If this node looks like it's suspect or dead, + // also tack on a suspect message so that it has a chance to refute as + // soon as possible. + deadline := time.Now().Add(probeInterval) destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)} - if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil { - m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err) - return + if node.State == stateAlive { + if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil { + m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err) + return + } + } else { + var msgs [][]byte + if buf, err := encode(pingMsg, &ping); err != nil { + m.logger.Printf("[ERR] memberlist: Failed to encode ping message: %s", err) + return + } else { + msgs = append(msgs, buf.Bytes()) + } + s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name} + if buf, err := encode(suspectMsg, &s); err != nil { + m.logger.Printf("[ERR] memberlist: Failed to encode suspect message: %s", err) + return + } else { + msgs = append(msgs, buf.Bytes()) + } + + compound := makeCompoundMessage(msgs) + if err := m.rawSendMsgUDP(destAddr, compound.Bytes()); err != nil { + m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", destAddr, err) + return + } } // Mark the sent time here, which should be after any pre-processing and @@ -237,6 +272,16 @@ func (m *Memberlist) probeNode(node *nodeState) { // but it's the best we can do. sent := time.Now() + // Arrange for our self-awareness to get updated. At this point we've + // sent the ping, so any return statement means the probe succeeded + // which will improve our health until we get to the failure scenarios + // at the end of this function, which will alter this delta variable + // accordingly. + awarenessDelta := -1 + defer func() { + m.awareness.ApplyDelta(awarenessDelta) + }() + // Wait for response or round-trip-time. select { case v := <-ackCh: @@ -254,6 +299,12 @@ func (m *Memberlist) probeNode(node *nodeState) { ackCh <- v } case <-time.After(m.config.ProbeTimeout): + // Note that we don't scale this timeout based on awareness and + // the health score. That's because we don't really expect waiting + // longer to help get UDP through. Since health does extend the + // probe interval it will give the TCP fallback more time, which + // is more active in dealing with lost packets, and it gives more + // time to wait for indirect acks/nacks. m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %v (timeout reached)", node.Name) } @@ -264,8 +315,15 @@ func (m *Memberlist) probeNode(node *nodeState) { m.nodeLock.RUnlock() // Attempt an indirect ping. + expectedNacks := 0 ind := indirectPingReq{SeqNo: ping.SeqNo, Target: node.Addr, Port: node.Port, Node: node.Name} for _, peer := range kNodes { + // We only expect nack to be sent from peers who understand + // version 4 of the protocol. + if ind.Nack = peer.PMax >= 4; ind.Nack { + expectedNacks++ + } + destAddr := &net.UDPAddr{IP: peer.Addr, Port: int(peer.Port)} if err := m.encodeAndSendMsg(destAddr, indirectPingMsg, &ind); err != nil { m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err) @@ -319,7 +377,23 @@ func (m *Memberlist) probeNode(node *nodeState) { } } - // No acks received from target, suspect + // Update our self-awareness based on the results of this failed probe. + // If we don't have peers who will send nacks then we penalize for any + // failed probe as a simple health metric. If we do have peers to nack + // verify, then we can use that as a more sophisticated measure of self- + // health because we assume them to be working, and they can help us + // decide if the probed node was really dead or if it was something wrong + // with ourselves. + awarenessDelta = 0 + if expectedNacks > 0 { + if nackCount := len(nackCh); nackCount < expectedNacks { + awarenessDelta += 2 * (expectedNacks - nackCount) + } + } else { + awarenessDelta += 1 + } + + // No acks received from target, suspect it as failed. m.logger.Printf("[INFO] memberlist: Suspect %s has failed, no acks received", node.Name) s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name} m.suspectNode(&s) @@ -330,7 +404,7 @@ func (m *Memberlist) Ping(node string, addr net.Addr) (time.Duration, error) { // Prepare a ping message and setup an ack handler. ping := ping{SeqNo: m.nextSeqNo(), Node: node} ackCh := make(chan ackMessage, m.config.IndirectChecks+1) - m.setAckChannel(ping.SeqNo, ackCh, m.config.ProbeInterval) + m.setProbeChannels(ping.SeqNo, ackCh, nil, m.config.ProbeInterval) // Send a ping to the node. if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil { @@ -584,6 +658,11 @@ func (m *Memberlist) nextIncarnation() uint32 { return atomic.AddUint32(&m.incarnation, 1) } +// skipIncarnation adds the positive offset to the incarnation number. +func (m *Memberlist) skipIncarnation(offset uint32) uint32 { + return atomic.AddUint32(&m.incarnation, offset) +} + // estNumNodes is used to get the current estimate of the number of nodes func (m *Memberlist) estNumNodes() int { return int(atomic.LoadUint32(&m.numNodes)) @@ -595,19 +674,27 @@ type ackMessage struct { Timestamp time.Time } -// setAckChannel is used to attach a channel to receive a message when an ack with a given -// sequence number is received. The `complete` field of the message will be false on timeout -func (m *Memberlist) setAckChannel(seqNo uint32, ch chan ackMessage, timeout time.Duration) { - // Create a handler function - handler := func(payload []byte, timestamp time.Time) { +// setProbeChannels is used to attach the ackCh to receive a message when an ack +// with a given sequence number is received. The `complete` field of the message +// will be false on timeout. Any nack messages will cause an empty struct to be +// passed to the nackCh, which can be nil if not needed. +func (m *Memberlist) setProbeChannels(seqNo uint32, ackCh chan ackMessage, nackCh chan struct{}, timeout time.Duration) { + // Create handler functions for acks and nacks + ackFn := func(payload []byte, timestamp time.Time) { select { - case ch <- ackMessage{true, payload, timestamp}: + case ackCh <- ackMessage{true, payload, timestamp}: + default: + } + } + nackFn := func() { + select { + case nackCh <- struct{}{}: default: } } - // Add the handler - ah := &ackHandler{handler, nil} + // Add the handlers + ah := &ackHandler{ackFn, nackFn, nil} m.ackLock.Lock() m.ackHandlers[seqNo] = ah m.ackLock.Unlock() @@ -618,18 +705,19 @@ func (m *Memberlist) setAckChannel(seqNo uint32, ch chan ackMessage, timeout tim delete(m.ackHandlers, seqNo) m.ackLock.Unlock() select { - case ch <- ackMessage{false, nil, time.Now()}: + case ackCh <- ackMessage{false, nil, time.Now()}: default: } }) } -// setAckHandler is used to attach a handler to be invoked when an -// ack with a given sequence number is received. If a timeout is reached, -// the handler is deleted -func (m *Memberlist) setAckHandler(seqNo uint32, handler func([]byte, time.Time), timeout time.Duration) { +// setAckHandler is used to attach a handler to be invoked when an ack with a +// given sequence number is received. If a timeout is reached, the handler is +// deleted. This is used for indirect pings so does not configure a function +// for nacks. +func (m *Memberlist) setAckHandler(seqNo uint32, ackFn func([]byte, time.Time), timeout time.Duration) { // Add the handler - ah := &ackHandler{handler, nil} + ah := &ackHandler{ackFn, nil, nil} m.ackLock.Lock() m.ackHandlers[seqNo] = ah m.ackLock.Unlock() @@ -642,7 +730,7 @@ func (m *Memberlist) setAckHandler(seqNo uint32, handler func([]byte, time.Time) }) } -// Invokes an Ack handler if any is associated, and reaps the handler immediately +// Invokes an ack handler if any is associated, and reaps the handler immediately func (m *Memberlist) invokeAckHandler(ack ackResp, timestamp time.Time) { m.ackLock.Lock() ah, ok := m.ackHandlers[ack.SeqNo] @@ -652,7 +740,49 @@ func (m *Memberlist) invokeAckHandler(ack ackResp, timestamp time.Time) { return } ah.timer.Stop() - ah.handler(ack.Payload, timestamp) + ah.ackFn(ack.Payload, timestamp) +} + +// Invokes nack handler if any is associated. +func (m *Memberlist) invokeNackHandler(nack nackResp) { + m.ackLock.Lock() + ah, ok := m.ackHandlers[nack.SeqNo] + m.ackLock.Unlock() + if !ok || ah.nackFn == nil { + return + } + ah.nackFn() +} + +// refute gossips an alive message in response to incoming information that we +// are suspect or dead. It will make sure the incarnation number beats the given +// accusedInc value, or you can supply 0 to just get the next incarnation number. +// This alters the node state that's passed in so this MUST be called while the +// nodeLock is held. +func (m *Memberlist) refute(me *nodeState, accusedInc uint32) { + // Make sure the incarnation number beats the accusation. + inc := m.nextIncarnation() + if accusedInc >= inc { + inc = m.skipIncarnation(accusedInc - inc + 1) + } + me.Incarnation = inc + + // Decrease our health because we are being asked to refute a problem. + m.awareness.ApplyDelta(1) + + // Format and broadcast an alive message. + a := alive{ + Incarnation: inc, + Node: me.Name, + Addr: me.Addr, + Port: me.Port, + Meta: me.Meta, + Vsn: []uint8{ + me.PMin, me.PMax, me.PCur, + me.DMin, me.DMax, me.DCur, + }, + } + m.encodeAndBroadcast(me.Addr.String(), aliveMsg, a) } // aliveNode is invoked by the network layer when we get a message about a @@ -754,6 +884,9 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { return } + // Clear out any suspicion timer that may be in effect. + delete(m.nodeTimers, a.Node) + // Store the old state and meta data oldState := state.State oldMeta := state.Meta @@ -783,21 +916,7 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { return } - inc := m.nextIncarnation() - for a.Incarnation >= inc { - inc = m.nextIncarnation() - } - state.Incarnation = inc - - a := alive{ - Incarnation: inc, - Node: state.Name, - Addr: state.Addr, - Port: state.Port, - Meta: state.Meta, - Vsn: versions, - } - m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify) + m.refute(state, a.Incarnation) m.logger.Printf("[WARN] memberlist: Refuting an alive message") } else { m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify) @@ -854,6 +973,17 @@ func (m *Memberlist) suspectNode(s *suspect) { return } + // See if there's a suspicion timer we can confirm. If the info is new + // to us we will go ahead and re-gossip it. This allows for multiple + // independent confirmations to flow even when a node probes a node + // that's already suspect. + if timer, ok := m.nodeTimers[s.Node]; ok { + if timer.Confirm(s.From) { + m.encodeAndBroadcast(s.Node, suspectMsg, s) + } + return + } + // Ignore non-alive nodes if state.State != stateAlive { return @@ -861,24 +991,7 @@ func (m *Memberlist) suspectNode(s *suspect) { // If this is us we need to refute, otherwise re-broadcast if state.Name == m.config.Name { - inc := m.nextIncarnation() - for s.Incarnation >= inc { - inc = m.nextIncarnation() - } - state.Incarnation = inc - - a := alive{ - Incarnation: inc, - Node: state.Name, - Addr: state.Addr, - Port: state.Port, - Meta: state.Meta, - Vsn: []uint8{ - state.PMin, state.PMax, state.PCur, - state.DMin, state.DMax, state.DCur, - }, - } - m.encodeAndBroadcast(s.Node, aliveMsg, a) + m.refute(state, s.Incarnation) m.logger.Printf("[WARN] memberlist: Refuting a suspect message (from: %s)", s.From) return // Do not mark ourself suspect } else { @@ -894,26 +1007,41 @@ func (m *Memberlist) suspectNode(s *suspect) { changeTime := time.Now() state.StateChange = changeTime - // Setup a timeout for this - timeout := suspicionTimeout(m.config.SuspicionMult, m.estNumNodes(), m.config.ProbeInterval) - time.AfterFunc(timeout, func() { + // Setup a suspicion timer. Given that we don't have any known phase + // relationship with our peers, we set up k such that we hit the nominal + // timeout two probe intervals short of what we expect given the suspicion + // multiplier. + k := m.config.SuspicionMult - 2 + + // If there aren't enough nodes to give the expected confirmations, just + // set k to 0 to say that we don't expect any. Note we subtract 2 from n + // here to take out ourselves and the node being probed. + n := m.estNumNodes() + if n-2 < k { + k = 0 + } + + // Compute the timeouts based on the size of the cluster. + min := suspicionTimeout(m.config.SuspicionMult, n, m.config.ProbeInterval) + max := time.Duration(m.config.SuspicionMaxTimeoutMult) * min + fn := func(numConfirmations int) { m.nodeLock.Lock() state, ok := m.nodeMap[s.Node] timeout := ok && state.State == stateSuspect && state.StateChange == changeTime m.nodeLock.Unlock() if timeout { - m.suspectTimeout(state) - } - }) -} + if k > 0 && numConfirmations < k { + metrics.IncrCounter([]string{"memberlist", "degraded", "timeout"}, 1) + } -// suspectTimeout is invoked when a suspect timeout has occurred -func (m *Memberlist) suspectTimeout(n *nodeState) { - // Construct a dead message - m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached", n.Name) - d := dead{Incarnation: n.Incarnation, Node: n.Name, From: m.config.Name} - m.deadNode(&d) + m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached (%d peer confirmations)", + state.Name, numConfirmations) + d := dead{Incarnation: state.Incarnation, Node: state.Name, From: m.config.Name} + m.deadNode(&d) + } + } + m.nodeTimers[s.Node] = newSuspicion(s.From, k, min, max, fn) } // deadNode is invoked by the network layer when we get a message @@ -933,6 +1061,9 @@ func (m *Memberlist) deadNode(d *dead) { return } + // Clear out any suspicion timer that may be in effect. + delete(m.nodeTimers, d.Node) + // Ignore if node is already dead if state.State == stateDead { return @@ -942,24 +1073,7 @@ func (m *Memberlist) deadNode(d *dead) { if state.Name == m.config.Name { // If we are not leaving we need to refute if !m.leave { - inc := m.nextIncarnation() - for d.Incarnation >= inc { - inc = m.nextIncarnation() - } - state.Incarnation = inc - - a := alive{ - Incarnation: inc, - Node: state.Name, - Addr: state.Addr, - Port: state.Port, - Meta: state.Meta, - Vsn: []uint8{ - state.PMin, state.PMax, state.PCur, - state.DMin, state.DMax, state.DCur, - }, - } - m.encodeAndBroadcast(d.Node, aliveMsg, a) + m.refute(state, d.Incarnation) m.logger.Printf("[WARN] memberlist: Refuting a dead message (from: %s)", d.From) return // Do not mark ourself dead } @@ -1001,7 +1115,7 @@ func (m *Memberlist) mergeState(remote []pushNodeState) { m.aliveNode(&a, nil, false) case stateDead: - // If the remote node belives a node is dead, we prefer to + // If the remote node believes a node is dead, we prefer to // suspect that node instead of declaring it dead instantly fallthrough case stateSuspect: diff --git a/vendor/github.com/hashicorp/memberlist/suspicion.go b/vendor/github.com/hashicorp/memberlist/suspicion.go new file mode 100644 index 0000000000..5f573e1fc6 --- /dev/null +++ b/vendor/github.com/hashicorp/memberlist/suspicion.go @@ -0,0 +1,130 @@ +package memberlist + +import ( + "math" + "sync/atomic" + "time" +) + +// suspicion manages the suspect timer for a node and provides an interface +// to accelerate the timeout as we get more independent confirmations that +// a node is suspect. +type suspicion struct { + // n is the number of independent confirmations we've seen. This must + // be updated using atomic instructions to prevent contention with the + // timer callback. + n int32 + + // k is the number of independent confirmations we'd like to see in + // order to drive the timer to its minimum value. + k int32 + + // min is the minimum timer value. + min time.Duration + + // max is the maximum timer value. + max time.Duration + + // start captures the timestamp when we began the timer. This is used + // so we can calculate durations to feed the timer during updates in + // a way the achieves the overall time we'd like. + start time.Time + + // timer is the underlying timer that implements the timeout. + timer *time.Timer + + // f is the function to call when the timer expires. We hold on to this + // because there are cases where we call it directly. + timeoutFn func() + + // confirmations is a map of "from" nodes that have confirmed a given + // node is suspect. This prevents double counting. + confirmations map[string]struct{} +} + +// newSuspicion returns a timer started with the max time, and that will drive +// to the min time after seeing k or more confirmations. The from node will be +// excluded from confirmations since we might get our own suspicion message +// gossiped back to us. The minimum time will be used if no confirmations are +// called for (k <= 0). +func newSuspicion(from string, k int, min time.Duration, max time.Duration, fn func(int)) *suspicion { + s := &suspicion{ + k: int32(k), + min: min, + max: max, + confirmations: make(map[string]struct{}), + } + + // Exclude the from node from any confirmations. + s.confirmations[from] = struct{}{} + + // Pass the number of confirmations into the timeout function for + // easy telemetry. + s.timeoutFn = func() { + fn(int(atomic.LoadInt32(&s.n))) + } + + // If there aren't any confirmations to be made then take the min + // time from the start. + timeout := max + if k < 1 { + timeout = min + } + s.timer = time.AfterFunc(timeout, s.timeoutFn) + + // Capture the start time right after starting the timer above so + // we should always err on the side of a little longer timeout if + // there's any preemption that separates this and the step above. + s.start = time.Now() + return s +} + +// remainingSuspicionTime takes the state variables of the suspicion timer and +// calculates the remaining time to wait before considering a node dead. The +// return value can be negative, so be prepared to fire the timer immediately in +// that case. +func remainingSuspicionTime(n, k int32, elapsed time.Duration, min, max time.Duration) time.Duration { + frac := math.Log(float64(n)+1.0) / math.Log(float64(k)+1.0) + raw := max.Seconds() - frac*(max.Seconds()-min.Seconds()) + timeout := time.Duration(math.Floor(1000.0*raw)) * time.Millisecond + if timeout < min { + timeout = min + } + + // We have to take into account the amount of time that has passed so + // far, so we get the right overall timeout. + return timeout - elapsed +} + +// Confirm registers that a possibly new peer has also determined the given +// node is suspect. This returns true if this was new information, and false +// if it was a duplicate confirmation, or if we've got enough confirmations to +// hit the minimum. +func (s *suspicion) Confirm(from string) bool { + // If we've got enough confirmations then stop accepting them. + if atomic.LoadInt32(&s.n) >= s.k { + return false + } + + // Only allow one confirmation from each possible peer. + if _, ok := s.confirmations[from]; ok { + return false + } + s.confirmations[from] = struct{}{} + + // Compute the new timeout given the current number of confirmations and + // adjust the timer. If the timeout becomes negative *and* we can cleanly + // stop the timer then we will call the timeout function directly from + // here. + n := atomic.AddInt32(&s.n, 1) + elapsed := time.Now().Sub(s.start) + remaining := remainingSuspicionTime(n, s.k, elapsed, s.min, s.max) + if s.timer.Stop() { + if remaining > 0 { + s.timer.Reset(remaining) + } else { + go s.timeoutFn() + } + } + return true +} diff --git a/vendor/github.com/hashicorp/memberlist/util.go b/vendor/github.com/hashicorp/memberlist/util.go index 38a92cbe60..7a59e3b370 100644 --- a/vendor/github.com/hashicorp/memberlist/util.go +++ b/vendor/github.com/hashicorp/memberlist/util.go @@ -9,6 +9,7 @@ import ( "math" "math/rand" "net" + "strings" "time" "github.com/hashicorp/go-msgpack/codec" @@ -326,6 +327,12 @@ func isLoopbackIP(ip_str string) bool { return loopbackBlock.Contains(ip) } +// Given a string of the form "host", "host:port", or "[ipv6::address]:port", +// return true if the string includes a port. +func hasPort(s string) bool { + return strings.LastIndex(s, ":") > strings.LastIndex(s, "]") +} + // compressPayload takes an opaque input buffer, compresses it // and wraps it in a compress{} message that is encoded. func compressPayload(inp []byte) (*bytes.Buffer, error) {