diff --git a/agent/consul/client_serf.go b/agent/consul/client_serf.go index dc109abee6..52f18ccdfd 100644 --- a/agent/consul/client_serf.go +++ b/agent/consul/client_serf.go @@ -60,6 +60,8 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) ( return nil, err } + c.addEnterpriseSerfTags(conf.Tags) + return serf.Create(conf) } @@ -85,6 +87,7 @@ func (c *Client) lanEventHandler() { case serf.EventUser: c.localEvent(e.(serf.UserEvent)) case serf.EventMemberUpdate: // Ignore + c.nodeUpdate(e.(serf.MemberEvent)) case serf.EventQuery: // Ignore default: c.logger.Warn("unhandled LAN Serf Event", "event", e) @@ -119,6 +122,25 @@ func (c *Client) nodeJoin(me serf.MemberEvent) { } } +// nodeUpdate is used to handle update events on the serf cluster +func (c *Client) nodeUpdate(me serf.MemberEvent) { + for _, m := range me.Members { + ok, parts := metadata.IsConsulServer(m) + if !ok { + continue + } + if parts.Datacenter != c.config.Datacenter { + c.logger.Warn("server has joined the wrong cluster: wrong datacenter", + "server", m.Name, + "datacenter", parts.Datacenter, + ) + continue + } + c.logger.Info("updating server", "server", parts.String()) + c.routers.AddServer(parts) + } +} + // nodeFail is used to handle fail events on the serf cluster func (c *Client) nodeFail(me serf.MemberEvent) { for _, m := range me.Members { diff --git a/agent/consul/enterprise_client_oss.go b/agent/consul/enterprise_client_oss.go index 290077ea89..2b73e6db86 100644 --- a/agent/consul/enterprise_client_oss.go +++ b/agent/consul/enterprise_client_oss.go @@ -23,3 +23,7 @@ func (c *Client) handleEnterpriseUserEvents(event serf.UserEvent) bool { func (c *Client) enterpriseStats() map[string]map[string]string { return nil } + +func (_ *Client) addEnterpriseSerfTags(_ map[string]string) { + // do nothing +} diff --git a/agent/consul/enterprise_server_oss.go b/agent/consul/enterprise_server_oss.go index 365e0900ae..9db4a9703c 100644 --- a/agent/consul/enterprise_server_oss.go +++ b/agent/consul/enterprise_server_oss.go @@ -54,3 +54,7 @@ func (s *Server) revokeEnterpriseLeadership() error { func (s *Server) validateEnterpriseRequest(entMeta *structs.EnterpriseMeta, write bool) error { return nil } + +func (_ *Server) addEnterpriseSerfTags(_ map[string]string) { + // do nothing +} diff --git a/agent/consul/server_serf.go b/agent/consul/server_serf.go index 3c1da46c10..a581a6fc4d 100644 --- a/agent/consul/server_serf.go +++ b/agent/consul/server_serf.go @@ -118,6 +118,8 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w return nil, err } + s.addEnterpriseSerfTags(conf.Tags) + return serf.Create(conf) } @@ -241,6 +243,19 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) { } } +func (s *Server) lanNodeUpdate(me serf.MemberEvent) { + for _, m := range me.Members { + ok, serverMeta := metadata.IsConsulServer(m) + if !ok || serverMeta.Segment != "" { + continue + } + s.logger.Info("Updating LAN server", "server", serverMeta.String()) + + // Update server lookup + s.serverLookup.AddServer(serverMeta) + } +} + // maybeBootstrap is used to handle bootstrapping when a new consul server joins. func (s *Server) maybeBootstrap() { // Bootstrap can only be done if there are no committed logs, remove our diff --git a/agent/metadata/server.go b/agent/metadata/server.go index 2532fa3bfa..60c04e888f 100644 --- a/agent/metadata/server.go +++ b/agent/metadata/server.go @@ -41,6 +41,7 @@ type Server struct { Status serf.MemberStatus NonVoter bool ACLs structs.ACLMode + FeatureFlags map[string]int // If true, use TLS when connecting to this server UseTLS bool @@ -103,6 +104,7 @@ func IsConsulServer(m serf.Member) (bool, *Server) { segmentAddrs := make(map[string]string) segmentPorts := make(map[string]int) + featureFlags := make(map[string]int) for name, value := range m.Tags { if strings.HasPrefix(name, "sl_") { addr, port, err := net.SplitHostPort(value) @@ -117,6 +119,13 @@ func IsConsulServer(m serf.Member) (bool, *Server) { segmentName := strings.TrimPrefix(name, "sl_") segmentAddrs[segmentName] = addr segmentPorts[segmentName] = segmentPort + } else if strings.HasPrefix(name, "ft_") { + featureName := strings.TrimPrefix(name, "ft_") + featureState, err := strconv.Atoi(value) + if err != nil { + return false, nil + } + featureFlags[featureName] = featureState } } @@ -173,6 +182,7 @@ func IsConsulServer(m serf.Member) (bool, *Server) { UseTLS: useTLS, NonVoter: nonVoter, ACLs: acls, + FeatureFlags: featureFlags, } return true, parts } diff --git a/agent/router/manager.go b/agent/router/manager.go index 1790d62c28..a293bb3dca 100644 --- a/agent/router/manager.go +++ b/agent/router/manager.go @@ -149,6 +149,18 @@ func (m *Manager) AddServer(s *metadata.Server) { m.saveServerList(l) } +// UpdateTLS updates the TLS setting for the servers in this manager +func (m *Manager) UpdateTLS(useTLS bool) { + m.listLock.Lock() + defer m.listLock.Unlock() + + list := m.getServerList() + for _, server := range list.servers { + server.UseTLS = useTLS + } + m.saveServerList(list) +} + // cycleServers returns a new list of servers that has dequeued the first // server and enqueued it at the end of the list. cycleServers assumes the // caller is holding the listLock. cycleServer does not test or ping @@ -218,6 +230,19 @@ func (m *Manager) FindServer() *metadata.Server { return l.servers[0] } +func (m *Manager) checkServers(fn func(srv *metadata.Server) bool) bool { + for _, srv := range m.getServerList().servers { + if !fn(srv) { + return false + } + } + return true +} + +func (m *Manager) CheckServers(fn func(srv *metadata.Server) bool) { + _ = m.checkServers(fn) +} + // getServerList is a convenience method which hides the locking semantics // of atomic.Value from the caller. func (m *Manager) getServerList() serverList { diff --git a/agent/router/router.go b/agent/router/router.go index 7a0ad75295..4cdc864b06 100644 --- a/agent/router/router.go +++ b/agent/router/router.go @@ -348,6 +348,28 @@ func (r *Router) findDirectRoute(datacenter string) (*Manager, *metadata.Server, return nil, nil, false } +// CheckServers returns thwo things +// 1. bool to indicate whether any servers were processed +// 2. error if any propagated from the fn +// +// The fn called should return a bool indicating whether checks should continue and an error +// If an error is returned then checks will stop immediately +func (r *Router) CheckServers(dc string, fn func(srv *metadata.Server) bool) { + r.RLock() + defer r.RUnlock() + + managers, ok := r.managers[dc] + if !ok { + return + } + + for _, m := range managers { + if !m.checkServers(fn) { + return + } + } +} + // GetDatacenters returns a list of datacenters known to the router, sorted by // name. func (r *Router) GetDatacenters() []string { diff --git a/agent/router/serf_adapter.go b/agent/router/serf_adapter.go index 66d002c3fd..82ed00e759 100644 --- a/agent/router/serf_adapter.go +++ b/agent/router/serf_adapter.go @@ -69,6 +69,7 @@ func HandleSerfEvents(logger hclog.Logger, router *Router, areaID types.AreaID, // All of these event types are ignored. case serf.EventMemberUpdate: + handleMemberEvent(logger, router.AddServer, areaID, e) case serf.EventUser: case serf.EventQuery: