From 995a24b8e4b89ce8282f9bfe804e4be8945c88c3 Mon Sep 17 00:00:00 2001 From: Hans Hasselberg Date: Mon, 27 Apr 2020 10:21:05 +0200 Subject: [PATCH] agent: refactor to use a single addrFn --- agent/consul/flood.go | 6 +-- agent/consul/server.go | 14 ++++--- agent/router/serf_flooder.go | 73 +++++++++++++++++++----------------- 3 files changed, 50 insertions(+), 43 deletions(-) diff --git a/agent/consul/flood.go b/agent/consul/flood.go index d8de89b324..ab9c540033 100644 --- a/agent/consul/flood.go +++ b/agent/consul/flood.go @@ -24,7 +24,7 @@ func (s *Server) FloodNotify() { // Flood is a long-running goroutine that floods servers from the LAN to the // given global Serf instance, such as the WAN. This will exit once either of // the Serf instances are shut down. -func (s *Server) Flood(addrFn router.FloodAddrFn, portFn router.FloodPortFn, dstSerf *serf.Serf) { +func (s *Server) Flood(addrFn router.FloodAddrFn, dstSerf *serf.Serf) { s.floodLock.Lock() floodCh := make(chan struct{}) s.floodCh = append(s.floodCh, floodCh) @@ -54,10 +54,10 @@ func (s *Server) Flood(addrFn router.FloodAddrFn, portFn router.FloodPortFn, dst return case <-ticker.C: - router.FloodJoins(s.logger, addrFn, portFn, s.config.Datacenter, s.serfLAN, dstSerf) + router.FloodJoins(s.logger, addrFn, s.config.Datacenter, s.serfLAN, dstSerf) case <-floodCh: - router.FloodJoins(s.logger, addrFn, portFn, s.config.Datacenter, s.serfLAN, dstSerf) + router.FloodJoins(s.logger, addrFn, s.config.Datacenter, s.serfLAN, dstSerf) } } diff --git a/agent/consul/server.go b/agent/consul/server.go index 3b6e9886ab..44026fe87b 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -558,13 +558,17 @@ func NewServerLogger(config *Config, logger hclog.InterceptLogger, tokens *token go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN) // Fire up the LAN <-> WAN join flooder. - portFn := func(s *metadata.Server) (int, bool) { - if s.WanJoinPort > 0 { - return s.WanJoinPort, true + addrFn := func(s *metadata.Server) (string, error) { + if s.WanJoinPort == 0 { + return "", fmt.Errorf("no wan join port for server: %s", s.Addr.String()) } - return 0, false + addr, _, err := net.SplitHostPort(s.Addr.String()) + if err != nil { + return "", err + } + return fmt.Sprintf("%s:%d", addr, s.WanJoinPort), nil } - go s.Flood(nil, portFn, s.serfWAN) + go s.Flood(addrFn, s.serfWAN) } // Start enterprise specific functionality diff --git a/agent/router/serf_flooder.go b/agent/router/serf_flooder.go index 9a6f242f34..b7aa8e676c 100644 --- a/agent/router/serf_flooder.go +++ b/agent/router/serf_flooder.go @@ -2,7 +2,6 @@ package router import ( "fmt" - "net" "strings" "github.com/hashicorp/consul/agent/metadata" @@ -10,20 +9,16 @@ import ( "github.com/hashicorp/serf/serf" ) -// FloodAddrFn gets the address to use for a given server when flood-joining. This -// will return false if it doesn't have one. -type FloodAddrFn func(*metadata.Server) (string, bool) - -// FloodPortFn gets the port to use for a given server when flood-joining. This -// will return false if it doesn't have one. -type FloodPortFn func(*metadata.Server) (int, bool) +// FloodAddrPortFn gets the address and port to use for a given server when +// flood-joining. This will return false if it doesn't have one. +type FloodAddrFn func(*metadata.Server) (string, error) // FloodJoins attempts to make sure all Consul servers in the src Serf // instance are joined in the dst Serf instance. It assumes names in the // src area are of the form and those in the dst area are of the // form . as is done for WAN and general network areas in Consul // Enterprise. -func FloodJoins(logger hclog.Logger, addrFn FloodAddrFn, portFn FloodPortFn, +func FloodJoins(logger hclog.Logger, addrFn FloodAddrFn, localDatacenter string, srcSerf *serf.Serf, dstSerf *serf.Serf) { // Names in the dst Serf have the datacenter suffixed. @@ -65,37 +60,45 @@ func FloodJoins(logger hclog.Logger, addrFn FloodAddrFn, portFn FloodPortFn, // TODO refactor into one function: - // We can't use the port number from the src Serf, so we just - // get the host part. - addr, _, err := net.SplitHostPort(server.Addr.String()) + addr, err := addrFn(server) if err != nil { - logger.Debug("Failed to flood-join server (bad address)", - "server", server.Name, - "address", server.Addr.String(), + logger.Debug("Failed to flood-join server", "server", + server.Name, "address", server.Addr.String(), "error", err, ) + continue } - if addrFn != nil { - if a, ok := addrFn(server); ok { - addr = a - } - } + // // We can't use the port number from the src Serf, so we just + // // get the host part. + // addr, _, err := net.SplitHostPort(server.Addr.String()) + // if err != nil { + // logger.Debug("Failed to flood-join server (bad address)", + // "server", server.Name, + // "address", server.Addr.String(), + // "error", err, + // ) + // } + // if addrFn != nil { + // if a, ok := addrFn(server); ok { + // addr = a + // } + // } - // Let the callback see if it can get the port number, otherwise - // leave it blank to behave as if we just supplied an address. - if port, ok := portFn(server); ok { - addr = net.JoinHostPort(addr, fmt.Sprintf("%d", port)) - } else { - // If we have an IPv6 address, we should add brackets, - // single dstSerf.Join expects that. - if ip := net.ParseIP(addr); ip != nil { - if ip.To4() == nil { - addr = fmt.Sprintf("[%s]", addr) - } - } else { - logger.Debug("Failed to parse IP", "ip", addr) - } - } + // // Let the callback see if it can get the port number, otherwise + // // leave it blank to behave as if we just supplied an address. + // if port, ok := portFn(server); ok { + // addr = net.JoinHostPort(addr, fmt.Sprintf("%d", port)) + // } else { + // // If we have an IPv6 address, we should add brackets, + // // single dstSerf.Join expects that. + // if ip := net.ParseIP(addr); ip != nil { + // if ip.To4() == nil { + // addr = fmt.Sprintf("[%s]", addr) + // } + // } else { + // logger.Debug("Failed to parse IP", "ip", addr) + // } + // } // end refactor