Updates Serf library to get relay fixes.

https://github.com/hashicorp/serf/pull/447
This commit is contained in:
James Phillips 2017-02-06 08:57:36 -08:00
parent f228812746
commit 0797d4f92b
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
4 changed files with 98 additions and 75 deletions

View File

@ -2,7 +2,6 @@ package serf
import (
"fmt"
"math/rand"
"net"
"sync"
"time"
@ -131,12 +130,12 @@ func (q *Query) Respond(buf []byte) error {
// Check if we've already responded
if q.deadline.IsZero() {
return fmt.Errorf("Response already sent")
return fmt.Errorf("response already sent")
}
// Ensure we aren't past our response deadline
if time.Now().After(q.deadline) {
return fmt.Errorf("Response is past the deadline")
return fmt.Errorf("response is past the deadline")
}
// Create response
@ -148,10 +147,9 @@ func (q *Query) Respond(buf []byte) error {
}
// Send a direct response
{
raw, err := encodeMessage(messageQueryResponseType, &resp)
if err != nil {
return fmt.Errorf("Failed to format response: %v", err)
return fmt.Errorf("failed to format response: %v", err)
}
// Check the size limit
@ -159,72 +157,18 @@ func (q *Query) Respond(buf []byte) error {
return fmt.Errorf("response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit)
}
// Send the response directly to the originator
addr := net.UDPAddr{IP: q.addr, Port: int(q.port)}
if err := q.serf.memberlist.SendTo(&addr, raw); err != nil {
return err
}
}
// Relay the response through up to relayFactor other nodes
members := q.serf.Members()
if len(members) > 2 {
addr := net.UDPAddr{IP: q.addr, Port: int(q.port)}
raw, err := encodeRelayMessage(messageQueryResponseType, addr, &resp)
if err != nil {
return fmt.Errorf("Failed to format relayed response: %v", err)
}
// Check the size limit
if len(raw) > q.serf.config.QueryResponseSizeLimit {
return fmt.Errorf("relayed response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit)
}
relayMembers := kRandomMembers(int(q.relayFactor), members, func(m Member) bool {
return m.Status != StatusAlive || m.ProtocolMax < 5 || m.Name == q.serf.LocalMember().Name
})
for _, m := range relayMembers {
relayAddr := net.UDPAddr{IP: m.Addr, Port: int(m.Port)}
if err := q.serf.memberlist.SendTo(&relayAddr, raw); err != nil {
if err := q.serf.relayResponse(q.relayFactor, addr, &resp); err != nil {
return err
}
}
}
// Clear the deadline, responses sent
q.deadline = time.Time{}
return nil
}
// kRandomMembers selects up to k members from a given list, optionally
// filtering by the given filterFunc
func kRandomMembers(k int, members []Member, filterFunc func(Member) bool) []Member {
n := len(members)
kMembers := make([]Member, 0, k)
OUTER:
// Probe up to 3*n times, with large n this is not necessary
// since k << n, but with small n we want search to be
// exhaustive
for i := 0; i < 3*n && len(kMembers) < k; i++ {
// Get random member
idx := rand.Intn(n)
member := members[idx]
// Give the filter a shot at it.
if filterFunc != nil && filterFunc(member) {
continue OUTER
}
// Check if we have this member already
for j := 0; j < len(kMembers); j++ {
if member.Name == kMembers[j].Name {
continue OUTER
}
}
// Append the member
kMembers = append(kMembers, member)
}
return kMembers
}

View File

@ -1,7 +1,10 @@
package serf
import (
"fmt"
"math"
"math/rand"
"net"
"regexp"
"sync"
"time"
@ -218,3 +221,74 @@ func (s *Serf) shouldProcessQuery(filters [][]byte) bool {
}
return true
}
// relayResponse will relay a copy of the given response to up to relayFactor
// other members.
func (s *Serf) relayResponse(relayFactor uint8, addr net.UDPAddr, resp *messageQueryResponse) error {
if relayFactor == 0 {
return nil
}
// Needs to be worth it; we need to have at least relayFactor *other*
// nodes. If you have a tiny cluster then the relayFactor shouldn't
// be needed.
members := s.Members()
if len(members) < int(relayFactor)+1 {
return nil
}
// Prep the relay message, which is a wrapped version of the original.
raw, err := encodeRelayMessage(messageQueryResponseType, addr, &resp)
if err != nil {
return fmt.Errorf("failed to format relayed response: %v", err)
}
if len(raw) > s.config.QueryResponseSizeLimit {
return fmt.Errorf("relayed response exceeds limit of %d bytes", s.config.QueryResponseSizeLimit)
}
// Relay to a random set of peers.
localName := s.LocalMember().Name
relayMembers := kRandomMembers(int(relayFactor), members, func(m Member) bool {
return m.Status != StatusAlive || m.ProtocolMax < 5 || m.Name == localName
})
for _, m := range relayMembers {
relayAddr := net.UDPAddr{IP: m.Addr, Port: int(m.Port)}
if err := s.memberlist.SendTo(&relayAddr, raw); err != nil {
return fmt.Errorf("failed to send relay response: %v", err)
}
}
return nil
}
// kRandomMembers selects up to k members from a given list, optionally
// filtering by the given filterFunc
func kRandomMembers(k int, members []Member, filterFunc func(Member) bool) []Member {
n := len(members)
kMembers := make([]Member, 0, k)
OUTER:
// Probe up to 3*n times, with large n this is not necessary
// since k << n, but with small n we want search to be
// exhaustive
for i := 0; i < 3*n && len(kMembers) < k; i++ {
// Get random member
idx := rand.Intn(n)
member := members[idx]
// Give the filter a shot at it.
if filterFunc != nil && filterFunc(member) {
continue OUTER
}
// Check if we have this member already
for j := 0; j < len(kMembers); j++ {
if member.Name == kMembers[j].Name {
continue OUTER
}
}
// Append the member
kMembers = append(kMembers, member)
}
return kMembers
}

View File

@ -1238,6 +1238,9 @@ func (s *Serf) handleQuery(query *messageQuery) bool {
if err := s.memberlist.SendTo(&addr, raw); err != nil {
s.logger.Printf("[ERR] serf: failed to send ack: %v", err)
}
if err := s.relayResponse(query.RelayFactor, addr, &ack); err != nil {
s.logger.Printf("[ERR] serf: failed to relay ack: %v", err)
}
}
}
@ -1286,6 +1289,7 @@ func (s *Serf) handleQueryResponse(resp *messageQueryResponse) {
if resp.Ack() {
// Exit early if this is a duplicate ack
if _, ok := query.acks[resp.From]; ok {
metrics.IncrCounter([]string{"serf", "query_duplicate_acks"}, 1)
return
}
@ -1299,6 +1303,7 @@ func (s *Serf) handleQueryResponse(resp *messageQueryResponse) {
} else {
// Exit early if this is a duplicate response
if _, ok := query.responses[resp.From]; ok {
metrics.IncrCounter([]string{"serf", "query_duplicate_responses"}, 1)
return
}

6
vendor/vendor.json vendored
View File

@ -625,11 +625,11 @@
"revisionTime": "2016-08-09T01:42:04Z"
},
{
"checksumSHA1": "EhESUBqb9Kot4rzZu2l/oAJoYCU=",
"checksumSHA1": "E63/tz2qjNJ6+hyGi9AoYb0sH9s=",
"comment": "v0.7.0-66-g6c4672d",
"path": "github.com/hashicorp/serf/serf",
"revision": "34e94dbd8faa991710b442c22ad6ad37c8b44c3b",
"revisionTime": "2017-02-02T01:56:25Z"
"revision": "f85661e5323286a0406cabeb0ad515962c1780b7",
"revisionTime": "2017-02-06T16:55:42Z"
},
{
"checksumSHA1": "ZhK6IO2XN81Y+3RAjTcVm1Ic7oU=",