mirror of https://github.com/status-im/consul.git
Remove server address tracking logic from manager/router and maintain it as part of lan event listener instead. Used sync.Map to track this, and added unit tests
This commit is contained in:
parent
830aca958a
commit
00836a6aab
|
@ -143,6 +143,9 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) {
|
|||
s.maybeBootstrap()
|
||||
}
|
||||
|
||||
// Update id to address map
|
||||
s.serverAddressLookup.AddServer(parts.ID, parts.Addr.String())
|
||||
|
||||
// Kick the join flooders.
|
||||
s.FloodNotify()
|
||||
}
|
||||
|
@ -274,5 +277,8 @@ func (s *Server) lanNodeFailed(me serf.MemberEvent) {
|
|||
s.localLock.Lock()
|
||||
delete(s.localConsuls, raft.ServerAddress(parts.Addr.String()))
|
||||
s.localLock.Unlock()
|
||||
|
||||
// Update id to address map
|
||||
s.serverAddressLookup.RemoveServer(parts.ID)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -171,6 +171,9 @@ type Server struct {
|
|||
// which SHOULD only consist of Consul servers
|
||||
serfWAN *serf.Serf
|
||||
|
||||
// fast lookup from id to server address to provide to the raft transport layer
|
||||
serverAddressLookup *ServerAddressLookup
|
||||
|
||||
// floodLock controls access to floodCh.
|
||||
floodLock sync.RWMutex
|
||||
floodCh []chan struct{}
|
||||
|
@ -286,6 +289,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
|
|||
ForceTLS: config.VerifyOutgoing,
|
||||
}
|
||||
|
||||
//serverAddrLookup = NewServerAddressLookup()
|
||||
// Create server.
|
||||
s := &Server{
|
||||
autopilotRemoveDeadCh: make(chan struct{}),
|
||||
|
@ -304,6 +308,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
|
|||
reassertLeaderCh: make(chan chan error),
|
||||
sessionTimers: NewSessionTimers(),
|
||||
tombstoneGC: gc,
|
||||
serverAddressLookup: NewServerAddressLookup(),
|
||||
shutdownCh: shutdownCh,
|
||||
}
|
||||
|
||||
|
@ -494,7 +499,12 @@ func (s *Server) setupRaft() error {
|
|||
}
|
||||
|
||||
// Create a transport layer.
|
||||
transConfig := &raft.NetworkTransportConfig{Stream: s.raftLayer, MaxPool: 3, Timeout: 10 * time.Second, ServerAddressProvider: s}
|
||||
transConfig := &raft.NetworkTransportConfig{
|
||||
Stream: s.raftLayer,
|
||||
MaxPool: 3,
|
||||
Timeout: 10 * time.Second,
|
||||
ServerAddressProvider: s.serverAddressLookup,
|
||||
}
|
||||
|
||||
trans := raft.NewNetworkTransportWithConfig(transConfig)
|
||||
s.raftTransport = trans
|
||||
|
@ -1049,17 +1059,6 @@ func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) {
|
|||
return s.serfWAN.GetCoordinate()
|
||||
}
|
||||
|
||||
func (s *Server) ServerAddr(id raft.ServerID) (raft.ServerAddress, error) {
|
||||
if string(id) == string(s.config.NodeID) {
|
||||
return raft.ServerAddress(s.config.RPCAddr.String()), nil
|
||||
}
|
||||
addr, err := s.router.GetServerAddressByID(s.config.Datacenter, string(id))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return raft.ServerAddress(addr), nil
|
||||
}
|
||||
|
||||
// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
|
||||
func (s *Server) setConsistentReadReady() {
|
||||
atomic.StoreInt32(&s.readyForConsistentReads, 1)
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/raft"
|
||||
)
|
||||
|
||||
// serverIdToAddress is a map from id to address for servers in the LAN pool.
|
||||
// used for fast lookup to satisfy the ServerAddressProvider interface
|
||||
type ServerAddressLookup struct {
|
||||
serverIdToAddress sync.Map
|
||||
}
|
||||
|
||||
func NewServerAddressLookup() *ServerAddressLookup {
|
||||
return &ServerAddressLookup{}
|
||||
}
|
||||
|
||||
func (sa *ServerAddressLookup) AddServer(id string, address string) {
|
||||
sa.serverIdToAddress.Store(id, address)
|
||||
}
|
||||
|
||||
func (sa *ServerAddressLookup) RemoveServer(id string) {
|
||||
sa.serverIdToAddress.Delete(id)
|
||||
}
|
||||
|
||||
func (sa *ServerAddressLookup) ServerAddr(id raft.ServerID) (raft.ServerAddress, error) {
|
||||
val, ok := sa.serverIdToAddress.Load(string(id))
|
||||
if !ok {
|
||||
return "", fmt.Errorf("Could not find address for server id %v", id)
|
||||
}
|
||||
return raft.ServerAddress(val.(string)), nil
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestServerAddressLookup(t *testing.T) {
|
||||
lookup := NewServerAddressLookup()
|
||||
addr := "72.0.0.17:8300"
|
||||
lookup.AddServer("1", addr)
|
||||
|
||||
got, err := lookup.ServerAddr("1")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error:%v", err)
|
||||
}
|
||||
if string(got) != addr {
|
||||
t.Fatalf("Expected %v but got %v", addr, got)
|
||||
}
|
||||
|
||||
lookup.RemoveServer("1")
|
||||
|
||||
got, err = lookup.ServerAddr("1")
|
||||
expectedErr := fmt.Errorf("Could not find address for server id 1")
|
||||
if expectedErr.Error() != err.Error() {
|
||||
t.Fatalf("Unexpected error, got %v wanted %v", err, expectedErr)
|
||||
}
|
||||
|
||||
lookup.RemoveServer("3")
|
||||
}
|
|
@ -79,9 +79,6 @@ type Manager struct {
|
|||
listValue atomic.Value
|
||||
listLock sync.Mutex
|
||||
|
||||
// idToAddress provides lookup of server address by id, and is maintained alongside listValue
|
||||
idToAddress atomic.Value
|
||||
|
||||
// rebalanceTimer controls the duration of the rebalance interval
|
||||
rebalanceTimer *time.Timer
|
||||
|
||||
|
@ -226,22 +223,10 @@ func (m *Manager) getServerList() serverList {
|
|||
return m.listValue.Load().(serverList)
|
||||
}
|
||||
|
||||
// GetServerAddress by ID returns a server address based on the id
|
||||
func (m *Manager) GetServerAddressByID(id string) string {
|
||||
idAddrMap := m.idToAddress.Load().(map[string]string)
|
||||
addr, ok := idAddrMap[id]
|
||||
if !ok {
|
||||
m.logger.Printf("[WARN] Unable to find address for node id %v", id)
|
||||
return ""
|
||||
}
|
||||
return addr
|
||||
}
|
||||
|
||||
// saveServerList is a convenience method which hides the locking semantics
|
||||
// of atomic.Value from the caller.
|
||||
func (m *Manager) saveServerList(l serverList) {
|
||||
m.listValue.Store(l)
|
||||
m.idToAddress.Store(makeIdAddrMap(l))
|
||||
}
|
||||
|
||||
func makeIdAddrMap(list serverList) map[string]string {
|
||||
|
|
|
@ -489,28 +489,3 @@ func (r *Router) GetDatacenterMaps() ([]structs.DatacenterMap, error) {
|
|||
}
|
||||
return maps, nil
|
||||
}
|
||||
|
||||
func (r *Router) GetServerAddressByID(datacenter string, id string) (string, error) {
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
||||
// Get the list of managers for this datacenter. This will usually just
|
||||
// have one entry, but it's possible to have a user-defined area + WAN.
|
||||
managers, ok := r.managers[datacenter]
|
||||
if !ok {
|
||||
return "", fmt.Errorf("datacenter %v not found", datacenter)
|
||||
}
|
||||
|
||||
// loop over all the managers till we find a matching address for the id
|
||||
// there could be more than for if network areas are configured
|
||||
for _, manager := range managers {
|
||||
if manager.IsOffline() {
|
||||
continue
|
||||
}
|
||||
id := manager.GetServerAddressByID(id)
|
||||
if id != "" {
|
||||
return id, nil
|
||||
}
|
||||
}
|
||||
return "", fmt.Errorf("Unable to match id %v to any known servers ", id)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue