diff --git a/agent/consul/serf.go b/agent/consul/serf.go index 16d3f7be31..d1e3d9012e 100644 --- a/agent/consul/serf.go +++ b/agent/consul/serf.go @@ -143,6 +143,9 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) { s.maybeBootstrap() } + // Update id to address map + s.serverAddressLookup.AddServer(parts.ID, parts.Addr.String()) + // Kick the join flooders. s.FloodNotify() } @@ -274,5 +277,8 @@ func (s *Server) lanNodeFailed(me serf.MemberEvent) { s.localLock.Lock() delete(s.localConsuls, raft.ServerAddress(parts.Addr.String())) s.localLock.Unlock() + + // Update id to address map + s.serverAddressLookup.RemoveServer(parts.ID) } } diff --git a/agent/consul/server.go b/agent/consul/server.go index ba11ace038..f6e263f0e5 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -171,6 +171,9 @@ type Server struct { // which SHOULD only consist of Consul servers serfWAN *serf.Serf + // fast lookup from id to server address to provide to the raft transport layer + serverAddressLookup *ServerAddressLookup + // floodLock controls access to floodCh. floodLock sync.RWMutex floodCh []chan struct{} @@ -286,6 +289,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (* ForceTLS: config.VerifyOutgoing, } + //serverAddrLookup = NewServerAddressLookup() // Create server. s := &Server{ autopilotRemoveDeadCh: make(chan struct{}), @@ -304,6 +308,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (* reassertLeaderCh: make(chan chan error), sessionTimers: NewSessionTimers(), tombstoneGC: gc, + serverAddressLookup: NewServerAddressLookup(), shutdownCh: shutdownCh, } @@ -494,7 +499,12 @@ func (s *Server) setupRaft() error { } // Create a transport layer. - transConfig := &raft.NetworkTransportConfig{Stream: s.raftLayer, MaxPool: 3, Timeout: 10 * time.Second, ServerAddressProvider: s} + transConfig := &raft.NetworkTransportConfig{ + Stream: s.raftLayer, + MaxPool: 3, + Timeout: 10 * time.Second, + ServerAddressProvider: s.serverAddressLookup, + } trans := raft.NewNetworkTransportWithConfig(transConfig) s.raftTransport = trans @@ -1049,17 +1059,6 @@ func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) { return s.serfWAN.GetCoordinate() } -func (s *Server) ServerAddr(id raft.ServerID) (raft.ServerAddress, error) { - if string(id) == string(s.config.NodeID) { - return raft.ServerAddress(s.config.RPCAddr.String()), nil - } - addr, err := s.router.GetServerAddressByID(s.config.Datacenter, string(id)) - if err != nil { - return "", err - } - return raft.ServerAddress(addr), nil -} - // Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write func (s *Server) setConsistentReadReady() { atomic.StoreInt32(&s.readyForConsistentReads, 1) diff --git a/agent/consul/server_address_lookup.go b/agent/consul/server_address_lookup.go new file mode 100644 index 0000000000..82d93a38b1 --- /dev/null +++ b/agent/consul/server_address_lookup.go @@ -0,0 +1,34 @@ +package consul + +import ( + "fmt" + "sync" + + "github.com/hashicorp/raft" +) + +// serverIdToAddress is a map from id to address for servers in the LAN pool. +// used for fast lookup to satisfy the ServerAddressProvider interface +type ServerAddressLookup struct { + serverIdToAddress sync.Map +} + +func NewServerAddressLookup() *ServerAddressLookup { + return &ServerAddressLookup{} +} + +func (sa *ServerAddressLookup) AddServer(id string, address string) { + sa.serverIdToAddress.Store(id, address) +} + +func (sa *ServerAddressLookup) RemoveServer(id string) { + sa.serverIdToAddress.Delete(id) +} + +func (sa *ServerAddressLookup) ServerAddr(id raft.ServerID) (raft.ServerAddress, error) { + val, ok := sa.serverIdToAddress.Load(string(id)) + if !ok { + return "", fmt.Errorf("Could not find address for server id %v", id) + } + return raft.ServerAddress(val.(string)), nil +} diff --git a/agent/consul/server_address_lookup_test.go b/agent/consul/server_address_lookup_test.go new file mode 100644 index 0000000000..e76725b564 --- /dev/null +++ b/agent/consul/server_address_lookup_test.go @@ -0,0 +1,30 @@ +package consul + +import ( + "fmt" + "testing" +) + +func TestServerAddressLookup(t *testing.T) { + lookup := NewServerAddressLookup() + addr := "72.0.0.17:8300" + lookup.AddServer("1", addr) + + got, err := lookup.ServerAddr("1") + if err != nil { + t.Fatalf("Unexpected error:%v", err) + } + if string(got) != addr { + t.Fatalf("Expected %v but got %v", addr, got) + } + + lookup.RemoveServer("1") + + got, err = lookup.ServerAddr("1") + expectedErr := fmt.Errorf("Could not find address for server id 1") + if expectedErr.Error() != err.Error() { + t.Fatalf("Unexpected error, got %v wanted %v", err, expectedErr) + } + + lookup.RemoveServer("3") +} diff --git a/agent/router/manager.go b/agent/router/manager.go index efa747c85f..d82ceb6c51 100644 --- a/agent/router/manager.go +++ b/agent/router/manager.go @@ -79,9 +79,6 @@ type Manager struct { listValue atomic.Value listLock sync.Mutex - // idToAddress provides lookup of server address by id, and is maintained alongside listValue - idToAddress atomic.Value - // rebalanceTimer controls the duration of the rebalance interval rebalanceTimer *time.Timer @@ -226,22 +223,10 @@ func (m *Manager) getServerList() serverList { return m.listValue.Load().(serverList) } -// GetServerAddress by ID returns a server address based on the id -func (m *Manager) GetServerAddressByID(id string) string { - idAddrMap := m.idToAddress.Load().(map[string]string) - addr, ok := idAddrMap[id] - if !ok { - m.logger.Printf("[WARN] Unable to find address for node id %v", id) - return "" - } - return addr -} - // saveServerList is a convenience method which hides the locking semantics // of atomic.Value from the caller. func (m *Manager) saveServerList(l serverList) { m.listValue.Store(l) - m.idToAddress.Store(makeIdAddrMap(l)) } func makeIdAddrMap(list serverList) map[string]string { diff --git a/agent/router/router.go b/agent/router/router.go index c0a48e623f..c41a6a79ce 100644 --- a/agent/router/router.go +++ b/agent/router/router.go @@ -489,28 +489,3 @@ func (r *Router) GetDatacenterMaps() ([]structs.DatacenterMap, error) { } return maps, nil } - -func (r *Router) GetServerAddressByID(datacenter string, id string) (string, error) { - r.RLock() - defer r.RUnlock() - - // Get the list of managers for this datacenter. This will usually just - // have one entry, but it's possible to have a user-defined area + WAN. - managers, ok := r.managers[datacenter] - if !ok { - return "", fmt.Errorf("datacenter %v not found", datacenter) - } - - // loop over all the managers till we find a matching address for the id - // there could be more than for if network areas are configured - for _, manager := range managers { - if manager.IsOffline() { - continue - } - id := manager.GetServerAddressByID(id) - if id != "" { - return id, nil - } - } - return "", fmt.Errorf("Unable to match id %v to any known servers ", id) -}