From 7a451f728e1654300d90d0a24feb98e285865872 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Wed, 15 Mar 2017 16:11:19 -0700 Subject: [PATCH] Makes the flood goroutine more reusable. --- consul/flood.go | 68 ++++++++++++++++++++++++++++++++++ consul/serf.go | 7 +--- consul/server.go | 48 ++++++------------------ consul/servers/serf_flooder.go | 6 ++- 4 files changed, 86 insertions(+), 43 deletions(-) create mode 100644 consul/flood.go diff --git a/consul/flood.go b/consul/flood.go new file mode 100644 index 0000000000..60fe0dfd4d --- /dev/null +++ b/consul/flood.go @@ -0,0 +1,68 @@ +package consul + +import ( + "time" + + "github.com/hashicorp/consul/consul/servers" + "github.com/hashicorp/serf/serf" +) + +// FloodNotify lets all the waiting Flood goroutines know that some change may +// have affected them. +func (s *Server) FloodNotify() { + s.floodLock.RLock() + defer s.floodLock.RUnlock() + + for _, ch := range s.floodCh { + select { + case ch <- struct{}{}: + default: + } + } +} + +// Flood is a long-running goroutine that floods servers from the LAN to the +// given global Serf instance, such as the WAN. This will exit once either of +// the Serf instances are shut down. +func (s *Server) Flood(portFn servers.FloodPortFn, global *serf.Serf) { + s.floodLock.Lock() + floodCh := make(chan struct{}) + s.floodCh = append(s.floodCh, floodCh) + s.floodLock.Unlock() + + ticker := time.NewTicker(s.config.SerfFloodInterval) + defer ticker.Stop() + defer func() { + s.floodLock.Lock() + defer s.floodLock.Unlock() + + for i, ch := range s.floodCh { + if ch == floodCh { + s.floodCh = append(s.floodCh[:i], s.floodCh[i+1:]...) + return + } + } + panic("flood channels out of sync") + }() + + for { + WAIT: + select { + case <-s.serfLAN.ShutdownCh(): + return + + case <-global.ShutdownCh(): + return + + case <-ticker.C: + goto FLOOD + + case <-floodCh: + goto FLOOD + } + goto WAIT + + FLOOD: + servers.FloodJoins(s.logger, portFn, s.config.Datacenter, s.serfLAN, global) + } +} diff --git a/consul/serf.go b/consul/serf.go index 4f00d564a8..5492da5512 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -143,11 +143,8 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) { s.maybeBootstrap() } - // Kick the WAN flooder. - select { - case s.floodCh <- struct{}{}: - default: - } + // Kick the join flooders. + s.FloodNotify() } } diff --git a/consul/server.go b/consul/server.go index df24cdc99b..02bb08ec0e 100644 --- a/consul/server.go +++ b/consul/server.go @@ -156,9 +156,9 @@ type Server struct { // which SHOULD only consist of Consul servers serfWAN *serf.Serf - // floodCh is kicked whenever we should try to flood LAN servers into - // the WAN. - floodCh chan struct{} + // floodLock controls access to floodCh. + floodLock sync.RWMutex + floodCh []chan struct{} // sessionTimers track the expiration time of each Session that has // a TTL. On expiration, a SessionDestroy event will occur, and @@ -258,7 +258,6 @@ func NewServer(config *Config) (*Server, error) { router: servers.NewRouter(logger, shutdownCh, config.Datacenter), rpcServer: rpc.NewServer(), rpcTLS: incomingTLS, - floodCh: make(chan struct{}), tombstoneGC: gc, shutdownCh: make(chan struct{}), } @@ -318,40 +317,15 @@ func NewServer(config *Config) (*Server, error) { } go servers.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN) - // Fire up the LAN <-> WAN Serf join flooder. - go func() { - ticker := time.NewTicker(config.SerfFloodInterval) - defer ticker.Stop() - - portFn := func(s *agent.Server) (int, bool) { - if s.WanJoinPort > 0 { - return s.WanJoinPort, true - } else { - return 0, false - } + // Fire up the LAN <-> WAN join flooder. + portFn := func(s *agent.Server) (int, bool) { + if s.WanJoinPort > 0 { + return s.WanJoinPort, true + } else { + return 0, false } - - for { - WAIT: - select { - case <-s.serfLAN.ShutdownCh(): - return - - case <-s.serfWAN.ShutdownCh(): - return - - case <-ticker.C: - goto FLOOD - - case <-s.floodCh: - goto FLOOD - } - goto WAIT - - FLOOD: - servers.FloodJoins(s.logger, portFn, config.Datacenter, s.serfLAN, s.serfWAN) - } - }() + } + go s.Flood(portFn, s.serfWAN) // Start monitoring leadership. This must happen after Serf is set up // since it can fire events when leadership is obtained. diff --git a/consul/servers/serf_flooder.go b/consul/servers/serf_flooder.go index 5134ec6777..f9242ad94c 100644 --- a/consul/servers/serf_flooder.go +++ b/consul/servers/serf_flooder.go @@ -72,9 +72,13 @@ func FloodJoins(logger *log.Logger, portFn FloodPortFn, } // Do the join! - if _, err := globalSerf.Join([]string{addr}, true); err != nil { + n, err := globalSerf.Join([]string{addr}, true) + if err != nil { logger.Printf("[DEBUG] consul: Failed to flood-join %q at %s: %v", server.Name, addr, err) + } else if n > 0 { + logger.Printf("[INFO] consul: Successfully performed flood-join for %q at %s", + server.Name, addr) } } }