Merge pull request #2574 from hashicorp/f-udp-checksum

Update memberlist vendor deps
This commit is contained in:
Kyle Havlovitz 2016-12-06 14:20:29 -05:00 committed by GitHub
commit 6cd5e4d491
6 changed files with 69 additions and 17 deletions

View File

@ -179,6 +179,11 @@ type Config struct {
// behavior for using LogOutput. You cannot specify both LogOutput and Logger
// at the same time.
Logger *log.Logger
// Size of Memberlist's internal channel which handles UDP messages. The
// size of this determines the size of the queue which Memberlist will keep
// while UDP messages are handled.
HandoffQueueDepth int
}
// DefaultLANConfig returns a sane set of configurations for Memberlist.
@ -216,6 +221,8 @@ func DefaultLANConfig() *Config {
Keyring: nil,
DNSConfigPath: "/etc/resolv.conf",
HandoffQueueDepth: 1024,
}
}

View File

@ -129,7 +129,7 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
leaveBroadcast: make(chan struct{}, 1),
udpListener: udpLn,
tcpListener: tcpLn,
handoff: make(chan msgHandoff, 1024),
handoff: make(chan msgHandoff, conf.HandoffQueueDepth),
nodeMap: make(map[string]*nodeState),
nodeTimers: make(map[string]*suspicion),
awareness: newAwareness(conf.AwarenessMaxMultiplier),
@ -496,7 +496,7 @@ func (m *Memberlist) SendTo(to net.Addr, msg []byte) error {
buf = append(buf, msg...)
// Send the message
return m.rawSendMsgUDP(to, buf)
return m.rawSendMsgUDP(to, nil, buf)
}
// SendToUDP is used to directly send a message to another node, without
@ -513,7 +513,7 @@ func (m *Memberlist) SendToUDP(to *Node, msg []byte) error {
// Send the message
destAddr := &net.UDPAddr{IP: to.Addr, Port: int(to.Port)}
return m.rawSendMsgUDP(destAddr, buf)
return m.rawSendMsgUDP(destAddr, to, buf)
}
// SendToTCP is used to directly send a message to another node, without

View File

@ -5,6 +5,7 @@ import (
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"net"
"time"
@ -32,7 +33,7 @@ const (
// understand version 4 or greater.
ProtocolVersion2Compatible = 2
ProtocolVersionMax = 4
ProtocolVersionMax = 5
)
// messageType is an integer ID of a type of message that can be received
@ -53,6 +54,7 @@ const (
compressMsg
encryptMsg
nackRespMsg
hasCrcMsg
)
// compressionType is used to specify the compression algorithm
@ -338,8 +340,18 @@ func (m *Memberlist) ingestPacket(buf []byte, from net.Addr, timestamp time.Time
buf = plain
}
// Handle the command
m.handleCommand(buf, from, timestamp)
// See if there's a checksum included to verify the contents of the message
if len(buf) >= 5 && messageType(buf[0]) == hasCrcMsg {
crc := crc32.ChecksumIEEE(buf[5:])
expected := binary.BigEndian.Uint32(buf[1:5])
if crc != expected {
m.logger.Printf("[WARN] memberlist: Got invalid checksum for UDP packet: %x, %x", crc, expected)
return
}
m.handleCommand(buf[5:], from, timestamp)
} else {
m.handleCommand(buf, from, timestamp)
}
}
func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Time) {
@ -601,7 +613,7 @@ func (m *Memberlist) sendMsg(to net.Addr, msg []byte) error {
// Fast path if nothing to piggypack
if len(extra) == 0 {
return m.rawSendMsgUDP(to, msg)
return m.rawSendMsgUDP(to, nil, msg)
}
// Join all the messages
@ -613,11 +625,11 @@ func (m *Memberlist) sendMsg(to net.Addr, msg []byte) error {
compound := makeCompoundMessage(msgs)
// Send the message
return m.rawSendMsgUDP(to, compound.Bytes())
return m.rawSendMsgUDP(to, nil, compound.Bytes())
}
// rawSendMsgUDP is used to send a UDP message to another host without modification
func (m *Memberlist) rawSendMsgUDP(to net.Addr, msg []byte) error {
func (m *Memberlist) rawSendMsgUDP(addr net.Addr, node *Node, msg []byte) error {
// Check if we have compression enabled
if m.config.EnableCompression {
buf, err := compressPayload(msg)
@ -631,6 +643,31 @@ func (m *Memberlist) rawSendMsgUDP(to net.Addr, msg []byte) error {
}
}
// Try to look up the destination node
if node == nil {
toAddr, _, err := net.SplitHostPort(addr.String())
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed to parse address %q: %v", addr.String(), err)
return err
}
m.nodeLock.RLock()
nodeState, ok := m.nodeMap[toAddr]
m.nodeLock.RUnlock()
if ok {
node = &nodeState.Node
}
}
// Add a CRC to the end of the payload if the recipient understands
// ProtocolVersion >= 5
if node != nil && node.PMax >= 5 {
crc := crc32.ChecksumIEEE(msg)
header := make([]byte, 5, 5+len(msg))
header[0] = byte(hasCrcMsg)
binary.BigEndian.PutUint32(header[1:], crc)
msg = append(header, msg...)
}
// Check if we have encryption enabled
if m.config.EncryptionEnabled() {
// Encrypt the payload
@ -645,7 +682,7 @@ func (m *Memberlist) rawSendMsgUDP(to net.Addr, msg []byte) error {
}
metrics.IncrCounter([]string{"memberlist", "udp", "sent"}, float32(len(msg)))
_, err := m.udpListener.WriteTo(msg, to)
_, err := m.udpListener.WriteTo(msg, addr)
return err
}

View File

@ -261,7 +261,7 @@ func (m *Memberlist) probeNode(node *nodeState) {
}
compound := makeCompoundMessage(msgs)
if err := m.rawSendMsgUDP(destAddr, compound.Bytes()); err != nil {
if err := m.rawSendMsgUDP(destAddr, &node.Node, compound.Bytes()); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", destAddr, err)
return
}
@ -484,7 +484,7 @@ func (m *Memberlist) gossip() {
// Send the compound message
destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)}
if err := m.rawSendMsgUDP(destAddr, compound.Bytes()); err != nil {
if err := m.rawSendMsgUDP(destAddr, &node.Node, compound.Bytes()); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", destAddr, err)
}
}

View File

@ -327,10 +327,18 @@ func isLoopbackIP(ip_str string) bool {
return loopbackBlock.Contains(ip)
}
// Given a string of the form "host", "host:port", or "[ipv6::address]:port",
// Given a string of the form "host", "host:port",
// "ipv6::addr" or "[ipv6::address]:port",
// return true if the string includes a port.
func hasPort(s string) bool {
return strings.LastIndex(s, ":") > strings.LastIndex(s, "]")
last := strings.LastIndex(s, ":")
if last == -1 {
return false
}
if s[0] == '[' {
return s[last-1] == ']'
}
return strings.Index(s, ":") == last
}
// compressPayload takes an opaque input buffer, compresses it

6
vendor/vendor.json vendored
View File

@ -528,10 +528,10 @@
"revisionTime": "2015-06-09T07:04:31Z"
},
{
"checksumSHA1": "AY1/cRsuWpoJMG0J821TqFo9nDE=",
"checksumSHA1": "F2jm1h5jRic/Q0e3UEk4aqzS7k0=",
"path": "github.com/hashicorp/memberlist",
"revision": "0c5ba075f8520c65572f001331a1a43b756e01d7",
"revisionTime": "2016-08-12T18:27:57Z"
"revision": "56f5fd70afa73f13bbe529192aeb3dc7bc4bc960",
"revisionTime": "2016-12-05T22:01:58Z"
},
{
"checksumSHA1": "qnlqWJYV81ENr61SZk9c65R1mDo=",