Update memberlist vendor deps

This commit is contained in:
Kyle Havlovitz 2016-12-05 17:27:52 -05:00
parent 4efa1a72fa
commit acc21cc5fe
No known key found for this signature in database
GPG Key ID: 8A5E6B173056AD6C
6 changed files with 69 additions and 17 deletions

View File

@ -179,6 +179,11 @@ type Config struct {
// behavior for using LogOutput. You cannot specify both LogOutput and Logger // behavior for using LogOutput. You cannot specify both LogOutput and Logger
// at the same time. // at the same time.
Logger *log.Logger Logger *log.Logger
// Size of Memberlist's internal channel which handles UDP messages. The
// size of this determines the size of the queue which Memberlist will keep
// while UDP messages are handled.
HandoffQueueDepth int
} }
// DefaultLANConfig returns a sane set of configurations for Memberlist. // DefaultLANConfig returns a sane set of configurations for Memberlist.
@ -216,6 +221,8 @@ func DefaultLANConfig() *Config {
Keyring: nil, Keyring: nil,
DNSConfigPath: "/etc/resolv.conf", DNSConfigPath: "/etc/resolv.conf",
HandoffQueueDepth: 1024,
} }
} }

View File

@ -129,7 +129,7 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
leaveBroadcast: make(chan struct{}, 1), leaveBroadcast: make(chan struct{}, 1),
udpListener: udpLn, udpListener: udpLn,
tcpListener: tcpLn, tcpListener: tcpLn,
handoff: make(chan msgHandoff, 1024), handoff: make(chan msgHandoff, conf.HandoffQueueDepth),
nodeMap: make(map[string]*nodeState), nodeMap: make(map[string]*nodeState),
nodeTimers: make(map[string]*suspicion), nodeTimers: make(map[string]*suspicion),
awareness: newAwareness(conf.AwarenessMaxMultiplier), awareness: newAwareness(conf.AwarenessMaxMultiplier),
@ -496,7 +496,7 @@ func (m *Memberlist) SendTo(to net.Addr, msg []byte) error {
buf = append(buf, msg...) buf = append(buf, msg...)
// Send the message // Send the message
return m.rawSendMsgUDP(to, buf) return m.rawSendMsgUDP(to, nil, buf)
} }
// SendToUDP is used to directly send a message to another node, without // SendToUDP is used to directly send a message to another node, without
@ -513,7 +513,7 @@ func (m *Memberlist) SendToUDP(to *Node, msg []byte) error {
// Send the message // Send the message
destAddr := &net.UDPAddr{IP: to.Addr, Port: int(to.Port)} destAddr := &net.UDPAddr{IP: to.Addr, Port: int(to.Port)}
return m.rawSendMsgUDP(destAddr, buf) return m.rawSendMsgUDP(destAddr, to, buf)
} }
// SendToTCP is used to directly send a message to another node, without // SendToTCP is used to directly send a message to another node, without

View File

@ -5,6 +5,7 @@ import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"hash/crc32"
"io" "io"
"net" "net"
"time" "time"
@ -32,7 +33,7 @@ const (
// understand version 4 or greater. // understand version 4 or greater.
ProtocolVersion2Compatible = 2 ProtocolVersion2Compatible = 2
ProtocolVersionMax = 4 ProtocolVersionMax = 5
) )
// messageType is an integer ID of a type of message that can be received // messageType is an integer ID of a type of message that can be received
@ -53,6 +54,7 @@ const (
compressMsg compressMsg
encryptMsg encryptMsg
nackRespMsg nackRespMsg
hasCrcMsg
) )
// compressionType is used to specify the compression algorithm // compressionType is used to specify the compression algorithm
@ -338,8 +340,18 @@ func (m *Memberlist) ingestPacket(buf []byte, from net.Addr, timestamp time.Time
buf = plain buf = plain
} }
// Handle the command // See if there's a checksum included to verify the contents of the message
m.handleCommand(buf, from, timestamp) if len(buf) >= 5 && messageType(buf[0]) == hasCrcMsg {
crc := crc32.ChecksumIEEE(buf[5:])
expected := binary.BigEndian.Uint32(buf[1:5])
if crc != expected {
m.logger.Printf("[WARN] memberlist: Got invalid checksum for UDP packet: %x, %x", crc, expected)
return
}
m.handleCommand(buf[5:], from, timestamp)
} else {
m.handleCommand(buf, from, timestamp)
}
} }
func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Time) { func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Time) {
@ -601,7 +613,7 @@ func (m *Memberlist) sendMsg(to net.Addr, msg []byte) error {
// Fast path if nothing to piggypack // Fast path if nothing to piggypack
if len(extra) == 0 { if len(extra) == 0 {
return m.rawSendMsgUDP(to, msg) return m.rawSendMsgUDP(to, nil, msg)
} }
// Join all the messages // Join all the messages
@ -613,11 +625,11 @@ func (m *Memberlist) sendMsg(to net.Addr, msg []byte) error {
compound := makeCompoundMessage(msgs) compound := makeCompoundMessage(msgs)
// Send the message // Send the message
return m.rawSendMsgUDP(to, compound.Bytes()) return m.rawSendMsgUDP(to, nil, compound.Bytes())
} }
// rawSendMsgUDP is used to send a UDP message to another host without modification // rawSendMsgUDP is used to send a UDP message to another host without modification
func (m *Memberlist) rawSendMsgUDP(to net.Addr, msg []byte) error { func (m *Memberlist) rawSendMsgUDP(addr net.Addr, node *Node, msg []byte) error {
// Check if we have compression enabled // Check if we have compression enabled
if m.config.EnableCompression { if m.config.EnableCompression {
buf, err := compressPayload(msg) buf, err := compressPayload(msg)
@ -631,6 +643,31 @@ func (m *Memberlist) rawSendMsgUDP(to net.Addr, msg []byte) error {
} }
} }
// Try to look up the destination node
if node == nil {
toAddr, _, err := net.SplitHostPort(addr.String())
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed to parse address %q: %v", addr.String(), err)
return err
}
m.nodeLock.RLock()
nodeState, ok := m.nodeMap[toAddr]
m.nodeLock.RUnlock()
if ok {
node = &nodeState.Node
}
}
// Add a CRC to the end of the payload if the recipient understands
// ProtocolVersion >= 5
if node != nil && node.PMax >= 5 {
crc := crc32.ChecksumIEEE(msg)
header := make([]byte, 5, 5+len(msg))
header[0] = byte(hasCrcMsg)
binary.BigEndian.PutUint32(header[1:], crc)
msg = append(header, msg...)
}
// Check if we have encryption enabled // Check if we have encryption enabled
if m.config.EncryptionEnabled() { if m.config.EncryptionEnabled() {
// Encrypt the payload // Encrypt the payload
@ -645,7 +682,7 @@ func (m *Memberlist) rawSendMsgUDP(to net.Addr, msg []byte) error {
} }
metrics.IncrCounter([]string{"memberlist", "udp", "sent"}, float32(len(msg))) metrics.IncrCounter([]string{"memberlist", "udp", "sent"}, float32(len(msg)))
_, err := m.udpListener.WriteTo(msg, to) _, err := m.udpListener.WriteTo(msg, addr)
return err return err
} }

View File

@ -261,7 +261,7 @@ func (m *Memberlist) probeNode(node *nodeState) {
} }
compound := makeCompoundMessage(msgs) compound := makeCompoundMessage(msgs)
if err := m.rawSendMsgUDP(destAddr, compound.Bytes()); err != nil { if err := m.rawSendMsgUDP(destAddr, &node.Node, compound.Bytes()); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", destAddr, err) m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", destAddr, err)
return return
} }
@ -484,7 +484,7 @@ func (m *Memberlist) gossip() {
// Send the compound message // Send the compound message
destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)} destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)}
if err := m.rawSendMsgUDP(destAddr, compound.Bytes()); err != nil { if err := m.rawSendMsgUDP(destAddr, &node.Node, compound.Bytes()); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", destAddr, err) m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", destAddr, err)
} }
} }

View File

@ -327,10 +327,18 @@ func isLoopbackIP(ip_str string) bool {
return loopbackBlock.Contains(ip) return loopbackBlock.Contains(ip)
} }
// Given a string of the form "host", "host:port", or "[ipv6::address]:port", // Given a string of the form "host", "host:port",
// "ipv6::addr" or "[ipv6::address]:port",
// return true if the string includes a port. // return true if the string includes a port.
func hasPort(s string) bool { func hasPort(s string) bool {
return strings.LastIndex(s, ":") > strings.LastIndex(s, "]") last := strings.LastIndex(s, ":")
if last == -1 {
return false
}
if s[0] == '[' {
return s[last-1] == ']'
}
return strings.Index(s, ":") == last
} }
// compressPayload takes an opaque input buffer, compresses it // compressPayload takes an opaque input buffer, compresses it

6
vendor/vendor.json vendored
View File

@ -528,10 +528,10 @@
"revisionTime": "2015-06-09T07:04:31Z" "revisionTime": "2015-06-09T07:04:31Z"
}, },
{ {
"checksumSHA1": "AY1/cRsuWpoJMG0J821TqFo9nDE=", "checksumSHA1": "F2jm1h5jRic/Q0e3UEk4aqzS7k0=",
"path": "github.com/hashicorp/memberlist", "path": "github.com/hashicorp/memberlist",
"revision": "0c5ba075f8520c65572f001331a1a43b756e01d7", "revision": "56f5fd70afa73f13bbe529192aeb3dc7bc4bc960",
"revisionTime": "2016-08-12T18:27:57Z" "revisionTime": "2016-12-05T22:01:58Z"
}, },
{ {
"checksumSHA1": "qnlqWJYV81ENr61SZk9c65R1mDo=", "checksumSHA1": "qnlqWJYV81ENr61SZk9c65R1mDo=",