mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 22:06:20 +00:00
parent
da9ab5985f
commit
798120d0c4
@ -278,7 +278,7 @@ func NewServerLogger(config *Config, logger *log.Logger) (*Server, error) {
|
||||
localConsuls: make(map[raft.ServerAddress]*agent.Server),
|
||||
logger: logger,
|
||||
reconcileCh: make(chan serf.Member, 32),
|
||||
router: servers.NewRouter(logger, shutdownCh, config.Datacenter),
|
||||
router: servers.NewRouter(logger, config.Datacenter),
|
||||
rpcServer: rpc.NewServer(),
|
||||
rpcTLS: incomingTLS,
|
||||
reassertLeaderCh: make(chan chan error),
|
||||
@ -686,6 +686,7 @@ func (s *Server) Shutdown() error {
|
||||
s.logger.Printf("[WARN] consul: error removing WAN area: %v", err)
|
||||
}
|
||||
}
|
||||
s.router.Shutdown()
|
||||
|
||||
if s.raft != nil {
|
||||
s.raftTransport.Close()
|
||||
|
@ -36,6 +36,10 @@ type Router struct {
|
||||
// routeFn is a hook to actually do the routing.
|
||||
routeFn func(datacenter string) (*Manager, *agent.Server, bool)
|
||||
|
||||
// isShutdown prevents adding new routes to a router after it is shut
|
||||
// down.
|
||||
isShutdown bool
|
||||
|
||||
// This top-level lock covers all the internal state.
|
||||
sync.RWMutex
|
||||
}
|
||||
@ -74,9 +78,8 @@ type areaInfo struct {
|
||||
managers map[string]*managerInfo
|
||||
}
|
||||
|
||||
// NewRouter returns a new Router with the given configuration. This will also
|
||||
// spawn a goroutine that cleans up when the given shutdownCh is closed.
|
||||
func NewRouter(logger *log.Logger, shutdownCh chan struct{}, localDatacenter string) *Router {
|
||||
// NewRouter returns a new Router with the given configuration.
|
||||
func NewRouter(logger *log.Logger, localDatacenter string) *Router {
|
||||
router := &Router{
|
||||
logger: logger,
|
||||
localDatacenter: localDatacenter,
|
||||
@ -87,23 +90,25 @@ func NewRouter(logger *log.Logger, shutdownCh chan struct{}, localDatacenter str
|
||||
// Hook the direct route lookup by default.
|
||||
router.routeFn = router.findDirectRoute
|
||||
|
||||
// This will propagate a top-level shutdown to all the managers.
|
||||
go func() {
|
||||
<-shutdownCh
|
||||
router.Lock()
|
||||
defer router.Unlock()
|
||||
return router
|
||||
}
|
||||
|
||||
for _, area := range router.areas {
|
||||
for _, info := range area.managers {
|
||||
close(info.shutdownCh)
|
||||
}
|
||||
// Shutdown removes all areas from the router, which stops all their respective
|
||||
// managers. No new areas can be added after the router is shut down.
|
||||
func (r *Router) Shutdown() {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
for areaID, area := range r.areas {
|
||||
for datacenter, info := range area.managers {
|
||||
r.removeManagerFromIndex(datacenter, info.manager)
|
||||
close(info.shutdownCh)
|
||||
}
|
||||
|
||||
router.areas = nil
|
||||
router.managers = nil
|
||||
}()
|
||||
delete(r.areas, areaID)
|
||||
}
|
||||
|
||||
return router
|
||||
r.isShutdown = true
|
||||
}
|
||||
|
||||
// AddArea registers a new network area with the router.
|
||||
@ -111,6 +116,10 @@ func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
if r.isShutdown {
|
||||
return fmt.Errorf("cannot add area, router is shut down")
|
||||
}
|
||||
|
||||
if _, ok := r.areas[areaID]; ok {
|
||||
return fmt.Errorf("area ID %q already exists", areaID)
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"os"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -95,8 +96,43 @@ func testCluster(self string) *mockCluster {
|
||||
|
||||
func testRouter(dc string) *Router {
|
||||
logger := log.New(os.Stderr, "", log.LstdFlags)
|
||||
shutdownCh := make(chan struct{})
|
||||
return NewRouter(logger, shutdownCh, dc)
|
||||
return NewRouter(logger, dc)
|
||||
}
|
||||
|
||||
func TestRouter_Shutdown(t *testing.T) {
|
||||
r := testRouter("dc0")
|
||||
|
||||
// Create a WAN-looking area.
|
||||
self := "node0.dc0"
|
||||
wan := testCluster(self)
|
||||
if err := r.AddArea(types.AreaWAN, wan, &fauxConnPool{}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Add another area.
|
||||
otherID := types.AreaID("other")
|
||||
other := newMockCluster(self)
|
||||
other.AddMember("dcY", "node1", nil)
|
||||
if err := r.AddArea(otherID, other, &fauxConnPool{}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
_, _, ok := r.FindRoute("dcY")
|
||||
if !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Shutdown and make sure we can't see any routes from before.
|
||||
r.Shutdown()
|
||||
_, _, ok = r.FindRoute("dcY")
|
||||
if ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// You can't add areas once the router is shut down.
|
||||
err := r.AddArea(otherID, other, &fauxConnPool{})
|
||||
if err == nil || !strings.Contains(err.Error(), "router is shut down") {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouter_Routing(t *testing.T) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user