Sync some feature flag support from enterprise (#7167)

This commit is contained in:
Matt Keeler 2020-01-29 13:21:38 -05:00 committed by GitHub
parent d78b5008ce
commit 61d8778210
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 103 additions and 0 deletions

View File

@ -60,6 +60,8 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
return nil, err
}
c.addEnterpriseSerfTags(conf.Tags)
return serf.Create(conf)
}
@ -85,6 +87,7 @@ func (c *Client) lanEventHandler() {
case serf.EventUser:
c.localEvent(e.(serf.UserEvent))
case serf.EventMemberUpdate: // Ignore
c.nodeUpdate(e.(serf.MemberEvent))
case serf.EventQuery: // Ignore
default:
c.logger.Warn("unhandled LAN Serf Event", "event", e)
@ -119,6 +122,25 @@ func (c *Client) nodeJoin(me serf.MemberEvent) {
}
}
// nodeUpdate is used to handle update events on the serf cluster
func (c *Client) nodeUpdate(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := metadata.IsConsulServer(m)
if !ok {
continue
}
if parts.Datacenter != c.config.Datacenter {
c.logger.Warn("server has joined the wrong cluster: wrong datacenter",
"server", m.Name,
"datacenter", parts.Datacenter,
)
continue
}
c.logger.Info("updating server", "server", parts.String())
c.routers.AddServer(parts)
}
}
// nodeFail is used to handle fail events on the serf cluster
func (c *Client) nodeFail(me serf.MemberEvent) {
for _, m := range me.Members {

View File

@ -23,3 +23,7 @@ func (c *Client) handleEnterpriseUserEvents(event serf.UserEvent) bool {
func (c *Client) enterpriseStats() map[string]map[string]string {
return nil
}
func (_ *Client) addEnterpriseSerfTags(_ map[string]string) {
// do nothing
}

View File

@ -54,3 +54,7 @@ func (s *Server) revokeEnterpriseLeadership() error {
func (s *Server) validateEnterpriseRequest(entMeta *structs.EnterpriseMeta, write bool) error {
return nil
}
func (_ *Server) addEnterpriseSerfTags(_ map[string]string) {
// do nothing
}

View File

@ -118,6 +118,8 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
return nil, err
}
s.addEnterpriseSerfTags(conf.Tags)
return serf.Create(conf)
}
@ -241,6 +243,19 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) {
}
}
func (s *Server) lanNodeUpdate(me serf.MemberEvent) {
for _, m := range me.Members {
ok, serverMeta := metadata.IsConsulServer(m)
if !ok || serverMeta.Segment != "" {
continue
}
s.logger.Info("Updating LAN server", "server", serverMeta.String())
// Update server lookup
s.serverLookup.AddServer(serverMeta)
}
}
// maybeBootstrap is used to handle bootstrapping when a new consul server joins.
func (s *Server) maybeBootstrap() {
// Bootstrap can only be done if there are no committed logs, remove our

View File

@ -41,6 +41,7 @@ type Server struct {
Status serf.MemberStatus
NonVoter bool
ACLs structs.ACLMode
FeatureFlags map[string]int
// If true, use TLS when connecting to this server
UseTLS bool
@ -103,6 +104,7 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
segmentAddrs := make(map[string]string)
segmentPorts := make(map[string]int)
featureFlags := make(map[string]int)
for name, value := range m.Tags {
if strings.HasPrefix(name, "sl_") {
addr, port, err := net.SplitHostPort(value)
@ -117,6 +119,13 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
segmentName := strings.TrimPrefix(name, "sl_")
segmentAddrs[segmentName] = addr
segmentPorts[segmentName] = segmentPort
} else if strings.HasPrefix(name, "ft_") {
featureName := strings.TrimPrefix(name, "ft_")
featureState, err := strconv.Atoi(value)
if err != nil {
return false, nil
}
featureFlags[featureName] = featureState
}
}
@ -173,6 +182,7 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
UseTLS: useTLS,
NonVoter: nonVoter,
ACLs: acls,
FeatureFlags: featureFlags,
}
return true, parts
}

View File

@ -149,6 +149,18 @@ func (m *Manager) AddServer(s *metadata.Server) {
m.saveServerList(l)
}
// UpdateTLS updates the TLS setting for the servers in this manager
func (m *Manager) UpdateTLS(useTLS bool) {
m.listLock.Lock()
defer m.listLock.Unlock()
list := m.getServerList()
for _, server := range list.servers {
server.UseTLS = useTLS
}
m.saveServerList(list)
}
// cycleServers returns a new list of servers that has dequeued the first
// server and enqueued it at the end of the list. cycleServers assumes the
// caller is holding the listLock. cycleServer does not test or ping
@ -218,6 +230,19 @@ func (m *Manager) FindServer() *metadata.Server {
return l.servers[0]
}
func (m *Manager) checkServers(fn func(srv *metadata.Server) bool) bool {
for _, srv := range m.getServerList().servers {
if !fn(srv) {
return false
}
}
return true
}
func (m *Manager) CheckServers(fn func(srv *metadata.Server) bool) {
_ = m.checkServers(fn)
}
// getServerList is a convenience method which hides the locking semantics
// of atomic.Value from the caller.
func (m *Manager) getServerList() serverList {

View File

@ -348,6 +348,28 @@ func (r *Router) findDirectRoute(datacenter string) (*Manager, *metadata.Server,
return nil, nil, false
}
// CheckServers returns thwo things
// 1. bool to indicate whether any servers were processed
// 2. error if any propagated from the fn
//
// The fn called should return a bool indicating whether checks should continue and an error
// If an error is returned then checks will stop immediately
func (r *Router) CheckServers(dc string, fn func(srv *metadata.Server) bool) {
r.RLock()
defer r.RUnlock()
managers, ok := r.managers[dc]
if !ok {
return
}
for _, m := range managers {
if !m.checkServers(fn) {
return
}
}
}
// GetDatacenters returns a list of datacenters known to the router, sorted by
// name.
func (r *Router) GetDatacenters() []string {

View File

@ -69,6 +69,7 @@ func HandleSerfEvents(logger hclog.Logger, router *Router, areaID types.AreaID,
// All of these event types are ignored.
case serf.EventMemberUpdate:
handleMemberEvent(logger, router.AddServer, areaID, e)
case serf.EventUser:
case serf.EventQuery: