consul: allow returning custom error for merge delegate

This commit is contained in:
Ryan Uber 2015-02-22 18:24:10 -08:00
parent 6f0fecc73c
commit 1200e25ec9
3 changed files with 12 additions and 18 deletions

View File

@ -139,7 +139,7 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
conf.SnapshotPath = filepath.Join(c.config.DataDir, path) conf.SnapshotPath = filepath.Join(c.config.DataDir, path)
conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion] conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion]
conf.RejoinAfterLeave = c.config.RejoinAfterLeave conf.RejoinAfterLeave = c.config.RejoinAfterLeave
conf.Merge = &lanMergeDelegate{logger: c.logger, dc: c.config.Datacenter} conf.Merge = &lanMergeDelegate{dc: c.config.Datacenter}
if err := ensurePath(conf.SnapshotPath, false); err != nil { if err := ensurePath(conf.SnapshotPath, false); err != nil {
return nil, err return nil, err
} }

View File

@ -1,7 +1,7 @@
package consul package consul
import ( import (
"log" "fmt"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
) )
@ -10,47 +10,41 @@ import (
// ring. We check that the peers are in the same datacenter and abort the // ring. We check that the peers are in the same datacenter and abort the
// merge if there is a mis-match. // merge if there is a mis-match.
type lanMergeDelegate struct { type lanMergeDelegate struct {
logger *log.Logger
dc string dc string
} }
func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) (cancel bool) { func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error {
for _, m := range members { for _, m := range members {
ok, dc := isConsulNode(*m) ok, dc := isConsulNode(*m)
if ok { if ok {
if dc != md.dc { if dc != md.dc {
md.logger.Printf("[WARN] consul: Canceling cluster merge, member '%s' part of wrong datacenter '%s'", return fmt.Errorf("Member '%s' part of wrong datacenter '%s'",
m.Name, dc) m.Name, dc)
return true
} }
continue continue
} }
ok, parts := isConsulServer(*m) ok, parts := isConsulServer(*m)
if ok && parts.Datacenter != md.dc { if ok && parts.Datacenter != md.dc {
md.logger.Printf("[WARN] consul: Canceling cluster merge, member '%s' part of wrong datacenter '%s'", return fmt.Errorf("Member '%s' part of wrong datacenter '%s'",
m.Name, parts.Datacenter) m.Name, parts.Datacenter)
return true
} }
} }
return false return nil
} }
// wanMergeDelegate is used to handle a cluster merge on the WAN gossip // wanMergeDelegate is used to handle a cluster merge on the WAN gossip
// ring. We check that the peers are server nodes and abort the merge // ring. We check that the peers are server nodes and abort the merge
// otherwise. // otherwise.
type wanMergeDelegate struct { type wanMergeDelegate struct {
logger *log.Logger
} }
func (md *wanMergeDelegate) NotifyMerge(members []*serf.Member) (cancel bool) { func (md *wanMergeDelegate) NotifyMerge(members []*serf.Member) error {
for _, m := range members { for _, m := range members {
ok, _ := isConsulServer(*m) ok, _ := isConsulServer(*m)
if !ok { if !ok {
md.logger.Printf("[WARN] consul: Canceling cluster merge, member '%s' is not a server", return fmt.Errorf("Member '%s' is not a server", m.Name)
m.Name)
return true
} }
} }
return false return nil
} }

View File

@ -311,9 +311,9 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion] conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion]
conf.RejoinAfterLeave = s.config.RejoinAfterLeave conf.RejoinAfterLeave = s.config.RejoinAfterLeave
if wan { if wan {
conf.Merge = &wanMergeDelegate{logger: s.logger} conf.Merge = &wanMergeDelegate{}
} else { } else {
conf.Merge = &lanMergeDelegate{logger: s.logger, dc: s.config.Datacenter} conf.Merge = &lanMergeDelegate{dc: s.config.Datacenter}
} }
// Until Consul supports this fully, we disable automatic resolution. // Until Consul supports this fully, we disable automatic resolution.