consul/agent/consul/flood.go

68 lines
1.4 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package consul
import (
"time"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/serf/serf"
)
// FloodNotify lets all the waiting Flood goroutines know that some change may
// have affected them.
func (s *Server) FloodNotify() {
s.floodLock.RLock()
defer s.floodLock.RUnlock()
for _, ch := range s.floodCh {
select {
case ch <- struct{}{}:
default:
}
}
}
// Flood is a long-running goroutine that floods servers from the LAN to the
// given global Serf instance, such as the WAN. This will exit once either of
// the Serf instances are shut down.
func (s *Server) Flood(addrFn router.FloodAddrFn, dstSerf *serf.Serf) {
s.floodLock.Lock()
floodCh := make(chan struct{})
s.floodCh = append(s.floodCh, floodCh)
s.floodLock.Unlock()
ticker := time.NewTicker(s.config.SerfFloodInterval)
defer ticker.Stop()
defer func() {
s.floodLock.Lock()
defer s.floodLock.Unlock()
for i, ch := range s.floodCh {
if ch == floodCh {
s.floodCh = append(s.floodCh[:i], s.floodCh[i+1:]...)
return
}
}
panic("flood channels out of sync")
}()
for {
select {
case <-s.serfLAN.ShutdownCh():
return
case <-dstSerf.ShutdownCh():
return
case <-ticker.C:
router.FloodJoins(s.logger, addrFn, s.config.Datacenter, s.serfLAN, dstSerf)
case <-floodCh:
router.FloodJoins(s.logger, addrFn, s.config.Datacenter, s.serfLAN, dstSerf)
}
}
}