From 4be2ab1a75350e339e6b14c24657d662d40e8251 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Sun, 7 Feb 2016 13:12:42 -0800 Subject: [PATCH] Moves tagged wan address to be managed by anti-entropy, not serf. --- command/agent/local.go | 52 ++++++++++++++++++++++++++++++++ command/agent/local_test.go | 60 +++++++++++++++++++++++++++++++++++++ consul/client.go | 3 -- consul/leader.go | 13 +------- consul/server.go | 3 -- 5 files changed, 113 insertions(+), 18 deletions(-) diff --git a/command/agent/local.go b/command/agent/local.go index 15752555a7..5815367229 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -3,6 +3,7 @@ package agent import ( "fmt" "log" + "reflect" "strings" "sync" "sync/atomic" @@ -45,6 +46,10 @@ type localState struct { // iface is the consul interface to use for keeping in sync iface consul.Interface + // nodeInfoInSync tracks whether the server has our correct top-level + // node information in sync (currently only used for tagged addresses) + nodeInfoInSync bool + // Services tracks the local services services map[string]*structs.NodeService serviceStatus map[string]syncStatus @@ -361,6 +366,13 @@ func (l *localState) setSyncState() error { l.Lock() defer l.Unlock() + // Check the node info (currently limited to tagged addresses since + // everything else is managed by the Serf layer) + if !reflect.DeepEqual(out1.NodeServices.Node.TaggedAddresses, l.config.TaggedAddresses) { + l.nodeInfoInSync = false + } + + // Check all our services services := make(map[string]*structs.NodeService) if out1.NodeServices != nil { services = out1.NodeServices.Services @@ -440,6 +452,10 @@ func (l *localState) syncChanges() error { l.Lock() defer l.Unlock() + // We will do node-level info syncing at the end, since it will get + // updated by a service or check sync anyway, given how the register + // API works. + // Sync the services for id, status := range l.serviceStatus { if status.remoteDelete { @@ -475,6 +491,15 @@ func (l *localState) syncChanges() error { l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id) } } + + // Now sync the node level info if we need to, and didn't do any of + // the other sync operations. + if !l.nodeInfoInSync { + if err := l.syncNodeInfo(); err != nil { + return err + } + } + return nil } @@ -554,6 +579,9 @@ func (l *localState) syncService(id string) error { err := l.iface.RPC("Catalog.Register", &req, &out) if err == nil { l.serviceStatus[id] = syncStatus{inSync: true} + // Given how the register API works, this info is also updated + // every time we sync a service. + l.nodeInfoInSync = true l.logger.Printf("[INFO] agent: Synced service '%s'", id) for _, check := range checks { l.checkStatus[check.CheckID] = syncStatus{inSync: true} @@ -593,6 +621,9 @@ func (l *localState) syncCheck(id string) error { err := l.iface.RPC("Catalog.Register", &req, &out) if err == nil { l.checkStatus[id] = syncStatus{inSync: true} + // Given how the register API works, this info is also updated + // every time we sync a service. + l.nodeInfoInSync = true l.logger.Printf("[INFO] agent: Synced check '%s'", id) } else if strings.Contains(err.Error(), permissionDenied) { l.checkStatus[id] = syncStatus{inSync: true} @@ -601,3 +632,24 @@ func (l *localState) syncCheck(id string) error { } return err } + +func (l *localState) syncNodeInfo() error { + req := structs.RegisterRequest{ + Datacenter: l.config.Datacenter, + Node: l.config.NodeName, + Address: l.config.AdvertiseAddr, + TaggedAddresses: l.config.TaggedAddresses, + WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, + } + var out struct{} + err := l.iface.RPC("Catalog.Register", &req, &out) + if err == nil { + l.nodeInfoInSync = true + l.logger.Printf("[INFO] agent: Synced node info") + } else if strings.Contains(err.Error(), permissionDenied) { + l.nodeInfoInSync = true + l.logger.Printf("[WARN] agent: Node info update blocked by ACLs") + return nil + } + return err +} diff --git a/command/agent/local_test.go b/command/agent/local_test.go index 58a2f23294..f7e453b0f8 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -731,6 +731,66 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { }) } +func TestAgentAntiEntropy_NodeInfo(t *testing.T) { + conf := nextConfig() + dir, agent := makeAgent(t, conf) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + testutil.WaitForLeader(t, agent.RPC, "dc1") + + // Register info + args := &structs.RegisterRequest{ + Datacenter: "dc1", + Node: agent.config.NodeName, + Address: "127.0.0.1", + } + var out struct{} + if err := agent.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Trigger anti-entropy run and wait + agent.StartSync() + time.Sleep(200 * time.Millisecond) + + // Verify that we are in sync + req := structs.NodeSpecificRequest{ + Datacenter: "dc1", + Node: agent.config.NodeName, + } + var services structs.IndexedNodeServices + if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil { + t.Fatalf("err: %v", err) + } + + // Make sure we synced our node info - this should have ridden on the + // "consul" service sync + addrs := services.NodeServices.Node.TaggedAddresses + if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) { + t.Fatalf("bad: %v", addrs) + } + + // Blow away the catalog version of the node info + if err := agent.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Trigger anti-entropy run and wait + agent.StartSync() + time.Sleep(200 * time.Millisecond) + + // Verify that we are in sync - this should have been a sync of just the + // node info + if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil { + t.Fatalf("err: %v", err) + } + addrs = services.NodeServices.Node.TaggedAddresses + if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) { + t.Fatalf("bad: %v", addrs) + } +} + func TestAgentAntiEntropy_deleteService_fails(t *testing.T) { l := new(localState) if err := l.deleteService(""); err == nil { diff --git a/consul/client.go b/consul/client.go index 89aaf438af..e7155a1444 100644 --- a/consul/client.go +++ b/consul/client.go @@ -181,9 +181,6 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) ( conf.RejoinAfterLeave = c.config.RejoinAfterLeave conf.Merge = &lanMergeDelegate{dc: c.config.Datacenter} conf.DisableCoordinates = c.config.DisableCoordinates - if wanAddr := c.config.SerfWANConfig.MemberlistConfig.AdvertiseAddr; wanAddr != "" { - conf.Tags["wan_addr"] = wanAddr - } if err := ensurePath(conf.SnapshotPath, false); err != nil { return nil, err } diff --git a/consul/leader.go b/consul/leader.go index 374e7a9a22..55c487b4fd 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -380,11 +380,6 @@ func (s *Server) handleAliveMember(member serf.Member) error { return err } if node != nil && node.Address == member.Addr.String() { - // Check if the WAN address was updated - if node.TaggedAddresses["wan"] != member.Tags["wan_addr"] { - goto AFTER_CHECK - } - // Check if the associated service is available if service != nil { match := false @@ -423,10 +418,7 @@ AFTER_CHECK: Datacenter: s.config.Datacenter, Node: member.Name, Address: member.Addr.String(), - TaggedAddresses: map[string]string{ - "wan": member.Tags["wan_addr"], - }, - Service: service, + Service: service, Check: &structs.HealthCheck{ Node: member.Name, CheckID: SerfCheckID, @@ -468,9 +460,6 @@ func (s *Server) handleFailedMember(member serf.Member) error { Datacenter: s.config.Datacenter, Node: member.Name, Address: member.Addr.String(), - TaggedAddresses: map[string]string{ - "wan": member.Tags["wan_addr"], - }, Check: &structs.HealthCheck{ Node: member.Name, CheckID: SerfCheckID, diff --git a/consul/server.go b/consul/server.go index 36371e60f3..7a59cb5945 100644 --- a/consul/server.go +++ b/consul/server.go @@ -299,9 +299,6 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w if s.config.BootstrapExpect != 0 { conf.Tags["expect"] = fmt.Sprintf("%d", s.config.BootstrapExpect) } - if wanAddr := s.config.SerfWANConfig.MemberlistConfig.AdvertiseAddr; wanAddr != "" { - conf.Tags["wan_addr"] = wanAddr - } conf.MemberlistConfig.LogOutput = s.config.LogOutput conf.LogOutput = s.config.LogOutput conf.EventCh = ch