Consolidate server lookup into one place and replace usages of localConsuls.

This commit is contained in:
Preetha Appan 2017-08-29 22:35:22 -05:00
parent 0f418a1bcf
commit 0f4e24f72c
6 changed files with 84 additions and 54 deletions

View File

@ -238,9 +238,7 @@ func (s *Server) getLeader() (bool, *metadata.Server) {
}
// Lookup the server
s.localLock.RLock()
server := s.localConsuls[leader]
s.localLock.RUnlock()
server, _ := s.serverLookup.GetServer(leader)
// Server could be nil
return false, server

View File

@ -125,18 +125,14 @@ func (s *Server) localEvent(event serf.UserEvent) {
// lanNodeJoin is used to handle join events on the LAN pool.
func (s *Server) lanNodeJoin(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := metadata.IsConsulServer(m)
ok, serverMeta := metadata.IsConsulServer(m)
if !ok {
continue
}
s.logger.Printf("[INFO] consul: Adding LAN server %s", parts)
s.logger.Printf("[INFO] consul: Adding LAN server %s", serverMeta)
// See if it's configured as part of our DC.
if parts.Datacenter == s.config.Datacenter {
s.localLock.Lock()
s.localConsuls[raft.ServerAddress(parts.Addr.String())] = parts
s.localLock.Unlock()
}
// Update server lookup
s.serverLookup.AddServer(serverMeta)
// If we're still expecting to bootstrap, may need to handle this.
if s.config.BootstrapExpect != 0 {
@ -144,7 +140,7 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) {
}
// Update id to address map
s.serverAddressLookup.AddServer(parts.ID, parts.Addr.String())
s.serverLookup.AddServer(serverMeta)
// Kick the join flooders.
s.FloodNotify()
@ -274,11 +270,7 @@ func (s *Server) lanNodeFailed(me serf.MemberEvent) {
}
s.logger.Printf("[INFO] consul: Removing LAN server %s", parts)
s.localLock.Lock()
delete(s.localConsuls, raft.ServerAddress(parts.Addr.String()))
s.localLock.Unlock()
// Update id to address map
s.serverAddressLookup.RemoveServer(parts.ID)
s.serverLookup.RemoveServer(parts)
}
}

View File

@ -123,11 +123,6 @@ type Server struct {
// strong consistency.
fsm *consulFSM
// localConsuls is used to track the known consuls
// in the local datacenter. Used to do leader forwarding.
localConsuls map[raft.ServerAddress]*metadata.Server
localLock sync.RWMutex
// Logger uses the provided LogOutput
logger *log.Logger
@ -171,8 +166,8 @@ type Server struct {
// which SHOULD only consist of Consul servers
serfWAN *serf.Serf
// fast lookup from id to server address to provide to the raft transport layer
serverAddressLookup *ServerAddressLookup
// serverLookup provides fast and thread-safe lookup by id and address
serverLookup *ServerLookup
// floodLock controls access to floodCh.
floodLock sync.RWMutex
@ -298,7 +293,6 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
connPool: connPool,
eventChLAN: make(chan serf.Event, 256),
eventChWAN: make(chan serf.Event, 256),
localConsuls: make(map[raft.ServerAddress]*metadata.Server),
logger: logger,
reconcileCh: make(chan serf.Member, 32),
router: router.NewRouter(logger, config.Datacenter),
@ -307,7 +301,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
reassertLeaderCh: make(chan chan error),
sessionTimers: NewSessionTimers(),
tombstoneGC: gc,
serverAddressLookup: NewServerAddressLookup(),
serverLookup: NewServerLookup(),
shutdownCh: shutdownCh,
}
@ -502,7 +496,7 @@ func (s *Server) setupRaft() error {
Stream: s.raftLayer,
MaxPool: 3,
Timeout: 10 * time.Second,
ServerAddressProvider: s.serverAddressLookup,
ServerAddressProvider: s.serverLookup,
}
trans := raft.NewNetworkTransportWithConfig(transConfig)
@ -705,9 +699,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
return true
}
s.localLock.RLock()
server, ok := s.localConsuls[address]
s.localLock.RUnlock()
server, ok := s.serverLookup.GetServer(address)
if !ok {
return false

View File

@ -4,31 +4,53 @@ import (
"fmt"
"sync"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/raft"
)
// serverIdToAddress is a map from id to address for servers in the LAN pool.
// used for fast lookup to satisfy the ServerAddressProvider interface
type ServerAddressLookup struct {
serverIdToAddress sync.Map
// ServerLookup encapsulates looking up servers by id and address
type ServerLookup struct {
lock sync.RWMutex
addressToServer map[raft.ServerAddress]*metadata.Server
IdToServer map[raft.ServerID]*metadata.Server
}
func NewServerAddressLookup() *ServerAddressLookup {
return &ServerAddressLookup{}
func NewServerLookup() *ServerLookup {
return &ServerLookup{addressToServer: make(map[raft.ServerAddress]*metadata.Server), IdToServer: make(map[raft.ServerID]*metadata.Server)}
}
func (sa *ServerAddressLookup) AddServer(id string, address string) {
sa.serverIdToAddress.Store(id, address)
func (sa *ServerLookup) AddServer(server *metadata.Server) {
sa.lock.Lock()
defer sa.lock.Unlock()
sa.addressToServer[raft.ServerAddress(server.Addr.String())] = server
sa.IdToServer[raft.ServerID(server.ID)] = server
}
func (sa *ServerAddressLookup) RemoveServer(id string) {
sa.serverIdToAddress.Delete(id)
func (sa *ServerLookup) RemoveServer(server *metadata.Server) {
sa.lock.Lock()
defer sa.lock.Unlock()
delete(sa.addressToServer, raft.ServerAddress(server.Addr.String()))
delete(sa.IdToServer, raft.ServerID(server.ID))
}
func (sa *ServerAddressLookup) ServerAddr(id raft.ServerID) (raft.ServerAddress, error) {
val, ok := sa.serverIdToAddress.Load(string(id))
// Implements the ServerAddressProvider interface
func (sa *ServerLookup) ServerAddr(id raft.ServerID) (raft.ServerAddress, error) {
sa.lock.RLock()
defer sa.lock.RUnlock()
svr, ok := sa.IdToServer[id]
if !ok {
return "", fmt.Errorf("Could not find address for server id %v", id)
}
return raft.ServerAddress(val.(string)), nil
return raft.ServerAddress(svr.Addr.String()), nil
}
// GetServer looks up the server by address, returns a boolean if not found
func (sa *ServerLookup) GetServer(addr raft.ServerAddress) (*metadata.Server, bool) {
sa.lock.RLock()
defer sa.lock.RUnlock()
svr, ok := sa.addressToServer[addr]
if !ok {
return nil, false
}
return svr, true
}

View File

@ -3,14 +3,32 @@ package consul
import (
"fmt"
"testing"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/raft"
)
func TestServerAddressLookup(t *testing.T) {
lookup := NewServerAddressLookup()
addr := "72.0.0.17:8300"
lookup.AddServer("1", addr)
type testAddr struct {
addr string
}
got, err := lookup.ServerAddr("1")
func (ta *testAddr) Network() string {
return "tcp"
}
func (ta *testAddr) String() string {
return ta.addr
}
func TestServerLookup(t *testing.T) {
lookup := NewServerLookup()
addr := "72.0.0.17:8300"
id := "1"
svr := &metadata.Server{ID: id, Addr: &testAddr{addr}}
lookup.AddServer(svr)
got, err := lookup.ServerAddr(raft.ServerID(id))
if err != nil {
t.Fatalf("Unexpected error:%v", err)
}
@ -18,7 +36,15 @@ func TestServerAddressLookup(t *testing.T) {
t.Fatalf("Expected %v but got %v", addr, got)
}
lookup.RemoveServer("1")
server, ok := lookup.GetServer(raft.ServerAddress(addr))
if !ok {
t.Fatalf("Expected lookup to return true")
}
if server.Addr.String() != addr {
t.Fatalf("Expected lookup to return address %v but got %v", addr, server.Addr)
}
lookup.RemoveServer(svr)
got, err = lookup.ServerAddr("1")
expectedErr := fmt.Errorf("Could not find address for server id 1")
@ -26,5 +52,7 @@ func TestServerAddressLookup(t *testing.T) {
t.Fatalf("Unexpected error, got %v wanted %v", err, expectedErr)
}
lookup.RemoveServer("3")
svr2 := &metadata.Server{ID: "2", Addr: &testAddr{"123.4.5.6"}}
lookup.RemoveServer(svr2)
}

View File

@ -342,7 +342,7 @@ func TestServer_JoinSeparateLanAndWanAddresses(t *testing.T) {
if len(s2.router.GetDatacenters()) != 2 {
r.Fatalf("remote consul missing")
}
if len(s2.localConsuls) != 2 {
if len(s2.serverLookup.addressToServer) != 2 {
r.Fatalf("local consul fellow s3 for s2 missing")
}
})
@ -666,14 +666,12 @@ func testVerifyRPC(s1, s2 *Server, t *testing.T) (bool, error) {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s2, 2)) })
// Have s2 make an RPC call to s1
s2.localLock.RLock()
var leader *metadata.Server
for _, server := range s2.localConsuls {
for _, server := range s2.serverLookup.addressToServer {
if server.Name == s1.config.NodeName {
leader = server
}
}
s2.localLock.RUnlock()
if leader == nil {
t.Fatal("no leader")
}