Moves tagged wan address to be managed by anti-entropy, not serf.

This commit is contained in:
James Phillips 2016-02-07 13:12:42 -08:00
parent 33462ebea9
commit 4be2ab1a75
5 changed files with 113 additions and 18 deletions

View File

@ -3,6 +3,7 @@ package agent
import ( import (
"fmt" "fmt"
"log" "log"
"reflect"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -45,6 +46,10 @@ type localState struct {
// iface is the consul interface to use for keeping in sync // iface is the consul interface to use for keeping in sync
iface consul.Interface iface consul.Interface
// nodeInfoInSync tracks whether the server has our correct top-level
// node information in sync (currently only used for tagged addresses)
nodeInfoInSync bool
// Services tracks the local services // Services tracks the local services
services map[string]*structs.NodeService services map[string]*structs.NodeService
serviceStatus map[string]syncStatus serviceStatus map[string]syncStatus
@ -361,6 +366,13 @@ func (l *localState) setSyncState() error {
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
// Check the node info (currently limited to tagged addresses since
// everything else is managed by the Serf layer)
if !reflect.DeepEqual(out1.NodeServices.Node.TaggedAddresses, l.config.TaggedAddresses) {
l.nodeInfoInSync = false
}
// Check all our services
services := make(map[string]*structs.NodeService) services := make(map[string]*structs.NodeService)
if out1.NodeServices != nil { if out1.NodeServices != nil {
services = out1.NodeServices.Services services = out1.NodeServices.Services
@ -440,6 +452,10 @@ func (l *localState) syncChanges() error {
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
// We will do node-level info syncing at the end, since it will get
// updated by a service or check sync anyway, given how the register
// API works.
// Sync the services // Sync the services
for id, status := range l.serviceStatus { for id, status := range l.serviceStatus {
if status.remoteDelete { if status.remoteDelete {
@ -475,6 +491,15 @@ func (l *localState) syncChanges() error {
l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id) l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id)
} }
} }
// Now sync the node level info if we need to, and didn't do any of
// the other sync operations.
if !l.nodeInfoInSync {
if err := l.syncNodeInfo(); err != nil {
return err
}
}
return nil return nil
} }
@ -554,6 +579,9 @@ func (l *localState) syncService(id string) error {
err := l.iface.RPC("Catalog.Register", &req, &out) err := l.iface.RPC("Catalog.Register", &req, &out)
if err == nil { if err == nil {
l.serviceStatus[id] = syncStatus{inSync: true} l.serviceStatus[id] = syncStatus{inSync: true}
// Given how the register API works, this info is also updated
// every time we sync a service.
l.nodeInfoInSync = true
l.logger.Printf("[INFO] agent: Synced service '%s'", id) l.logger.Printf("[INFO] agent: Synced service '%s'", id)
for _, check := range checks { for _, check := range checks {
l.checkStatus[check.CheckID] = syncStatus{inSync: true} l.checkStatus[check.CheckID] = syncStatus{inSync: true}
@ -593,6 +621,9 @@ func (l *localState) syncCheck(id string) error {
err := l.iface.RPC("Catalog.Register", &req, &out) err := l.iface.RPC("Catalog.Register", &req, &out)
if err == nil { if err == nil {
l.checkStatus[id] = syncStatus{inSync: true} l.checkStatus[id] = syncStatus{inSync: true}
// Given how the register API works, this info is also updated
// every time we sync a service.
l.nodeInfoInSync = true
l.logger.Printf("[INFO] agent: Synced check '%s'", id) l.logger.Printf("[INFO] agent: Synced check '%s'", id)
} else if strings.Contains(err.Error(), permissionDenied) { } else if strings.Contains(err.Error(), permissionDenied) {
l.checkStatus[id] = syncStatus{inSync: true} l.checkStatus[id] = syncStatus{inSync: true}
@ -601,3 +632,24 @@ func (l *localState) syncCheck(id string) error {
} }
return err return err
} }
func (l *localState) syncNodeInfo() error {
req := structs.RegisterRequest{
Datacenter: l.config.Datacenter,
Node: l.config.NodeName,
Address: l.config.AdvertiseAddr,
TaggedAddresses: l.config.TaggedAddresses,
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken},
}
var out struct{}
err := l.iface.RPC("Catalog.Register", &req, &out)
if err == nil {
l.nodeInfoInSync = true
l.logger.Printf("[INFO] agent: Synced node info")
} else if strings.Contains(err.Error(), permissionDenied) {
l.nodeInfoInSync = true
l.logger.Printf("[WARN] agent: Node info update blocked by ACLs")
return nil
}
return err
}

View File

@ -731,6 +731,66 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
}) })
} }
func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
conf := nextConfig()
dir, agent := makeAgent(t, conf)
defer os.RemoveAll(dir)
defer agent.Shutdown()
testutil.WaitForLeader(t, agent.RPC, "dc1")
// Register info
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: agent.config.NodeName,
Address: "127.0.0.1",
}
var out struct{}
if err := agent.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Trigger anti-entropy run and wait
agent.StartSync()
time.Sleep(200 * time.Millisecond)
// Verify that we are in sync
req := structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: agent.config.NodeName,
}
var services structs.IndexedNodeServices
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
t.Fatalf("err: %v", err)
}
// Make sure we synced our node info - this should have ridden on the
// "consul" service sync
addrs := services.NodeServices.Node.TaggedAddresses
if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) {
t.Fatalf("bad: %v", addrs)
}
// Blow away the catalog version of the node info
if err := agent.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Trigger anti-entropy run and wait
agent.StartSync()
time.Sleep(200 * time.Millisecond)
// Verify that we are in sync - this should have been a sync of just the
// node info
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
t.Fatalf("err: %v", err)
}
addrs = services.NodeServices.Node.TaggedAddresses
if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) {
t.Fatalf("bad: %v", addrs)
}
}
func TestAgentAntiEntropy_deleteService_fails(t *testing.T) { func TestAgentAntiEntropy_deleteService_fails(t *testing.T) {
l := new(localState) l := new(localState)
if err := l.deleteService(""); err == nil { if err := l.deleteService(""); err == nil {

View File

@ -181,9 +181,6 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
conf.RejoinAfterLeave = c.config.RejoinAfterLeave conf.RejoinAfterLeave = c.config.RejoinAfterLeave
conf.Merge = &lanMergeDelegate{dc: c.config.Datacenter} conf.Merge = &lanMergeDelegate{dc: c.config.Datacenter}
conf.DisableCoordinates = c.config.DisableCoordinates conf.DisableCoordinates = c.config.DisableCoordinates
if wanAddr := c.config.SerfWANConfig.MemberlistConfig.AdvertiseAddr; wanAddr != "" {
conf.Tags["wan_addr"] = wanAddr
}
if err := ensurePath(conf.SnapshotPath, false); err != nil { if err := ensurePath(conf.SnapshotPath, false); err != nil {
return nil, err return nil, err
} }

View File

@ -380,11 +380,6 @@ func (s *Server) handleAliveMember(member serf.Member) error {
return err return err
} }
if node != nil && node.Address == member.Addr.String() { if node != nil && node.Address == member.Addr.String() {
// Check if the WAN address was updated
if node.TaggedAddresses["wan"] != member.Tags["wan_addr"] {
goto AFTER_CHECK
}
// Check if the associated service is available // Check if the associated service is available
if service != nil { if service != nil {
match := false match := false
@ -423,9 +418,6 @@ AFTER_CHECK:
Datacenter: s.config.Datacenter, Datacenter: s.config.Datacenter,
Node: member.Name, Node: member.Name,
Address: member.Addr.String(), Address: member.Addr.String(),
TaggedAddresses: map[string]string{
"wan": member.Tags["wan_addr"],
},
Service: service, Service: service,
Check: &structs.HealthCheck{ Check: &structs.HealthCheck{
Node: member.Name, Node: member.Name,
@ -468,9 +460,6 @@ func (s *Server) handleFailedMember(member serf.Member) error {
Datacenter: s.config.Datacenter, Datacenter: s.config.Datacenter,
Node: member.Name, Node: member.Name,
Address: member.Addr.String(), Address: member.Addr.String(),
TaggedAddresses: map[string]string{
"wan": member.Tags["wan_addr"],
},
Check: &structs.HealthCheck{ Check: &structs.HealthCheck{
Node: member.Name, Node: member.Name,
CheckID: SerfCheckID, CheckID: SerfCheckID,

View File

@ -299,9 +299,6 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
if s.config.BootstrapExpect != 0 { if s.config.BootstrapExpect != 0 {
conf.Tags["expect"] = fmt.Sprintf("%d", s.config.BootstrapExpect) conf.Tags["expect"] = fmt.Sprintf("%d", s.config.BootstrapExpect)
} }
if wanAddr := s.config.SerfWANConfig.MemberlistConfig.AdvertiseAddr; wanAddr != "" {
conf.Tags["wan_addr"] = wanAddr
}
conf.MemberlistConfig.LogOutput = s.config.LogOutput conf.MemberlistConfig.LogOutput = s.config.LogOutput
conf.LogOutput = s.config.LogOutput conf.LogOutput = s.config.LogOutput
conf.EventCh = ch conf.EventCh = ch