mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 22:06:20 +00:00
agent: move agent/consul/agent to agent/metadata
This commit is contained in:
parent
c395599cea
commit
7cff50a4df
@ -8,7 +8,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
"github.com/hashicorp/consul/agent/consul/agent"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/go-version"
|
"github.com/hashicorp/go-version"
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
@ -90,7 +90,7 @@ func (s *Server) pruneDeadServers(autopilotConfig *structs.AutopilotConfig) erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, member := range s.serfLAN.Members() {
|
for _, member := range s.serfLAN.Members() {
|
||||||
valid, parts := agent.IsConsulServer(member)
|
valid, parts := metadata.IsConsulServer(member)
|
||||||
|
|
||||||
if valid {
|
if valid {
|
||||||
// Remove this server from the stale list; it has a serf entry
|
// Remove this server from the stale list; it has a serf entry
|
||||||
@ -275,13 +275,13 @@ func (s *Server) updateClusterHealth() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get the the serf members which are Consul servers
|
// Get the the serf members which are Consul servers
|
||||||
serverMap := make(map[string]*agent.Server)
|
serverMap := make(map[string]*metadata.Server)
|
||||||
for _, member := range s.LANMembers() {
|
for _, member := range s.LANMembers() {
|
||||||
if member.Status == serf.StatusLeft {
|
if member.Status == serf.StatusLeft {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
valid, parts := agent.IsConsulServer(member)
|
valid, parts := metadata.IsConsulServer(member)
|
||||||
if valid {
|
if valid {
|
||||||
serverMap[parts.ID] = parts
|
serverMap[parts.ID] = parts
|
||||||
}
|
}
|
||||||
@ -297,7 +297,7 @@ func (s *Server) updateClusterHealth() error {
|
|||||||
// consistent of a sample as possible. We capture the leader's index
|
// consistent of a sample as possible. We capture the leader's index
|
||||||
// here as well so it roughly lines up with the same point in time.
|
// here as well so it roughly lines up with the same point in time.
|
||||||
targetLastIndex := s.raft.LastIndex()
|
targetLastIndex := s.raft.LastIndex()
|
||||||
var fetchList []*agent.Server
|
var fetchList []*metadata.Server
|
||||||
for _, server := range servers {
|
for _, server := range servers {
|
||||||
if parts, ok := serverMap[string(server.ID)]; ok {
|
if parts, ok := serverMap[string(server.ID)]; ok {
|
||||||
fetchList = append(fetchList, parts)
|
fetchList = append(fetchList, parts)
|
||||||
@ -377,7 +377,7 @@ func (s *Server) updateClusterHealth() error {
|
|||||||
// updateServerHealth computes the resulting health of the server based on its
|
// updateServerHealth computes the resulting health of the server based on its
|
||||||
// fetched stats and the state of the leader.
|
// fetched stats and the state of the leader.
|
||||||
func (s *Server) updateServerHealth(health *structs.ServerHealth,
|
func (s *Server) updateServerHealth(health *structs.ServerHealth,
|
||||||
server *agent.Server, stats *structs.ServerStats,
|
server *metadata.Server, stats *structs.ServerStats,
|
||||||
autopilotConf *structs.AutopilotConfig, targetLastIndex uint64) error {
|
autopilotConf *structs.AutopilotConfig, targetLastIndex uint64) error {
|
||||||
|
|
||||||
health.LastTerm = stats.LastTerm
|
health.LastTerm = stats.LastTerm
|
||||||
|
@ -11,7 +11,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/agent"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/agent/pool"
|
"github.com/hashicorp/consul/agent/pool"
|
||||||
"github.com/hashicorp/consul/agent/router"
|
"github.com/hashicorp/consul/agent/router"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
@ -275,7 +275,7 @@ func (c *Client) lanEventHandler() {
|
|||||||
// nodeJoin is used to handle join events on the serf cluster
|
// nodeJoin is used to handle join events on the serf cluster
|
||||||
func (c *Client) nodeJoin(me serf.MemberEvent) {
|
func (c *Client) nodeJoin(me serf.MemberEvent) {
|
||||||
for _, m := range me.Members {
|
for _, m := range me.Members {
|
||||||
ok, parts := agent.IsConsulServer(m)
|
ok, parts := metadata.IsConsulServer(m)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -297,7 +297,7 @@ func (c *Client) nodeJoin(me serf.MemberEvent) {
|
|||||||
// nodeFail is used to handle fail events on the serf cluster
|
// nodeFail is used to handle fail events on the serf cluster
|
||||||
func (c *Client) nodeFail(me serf.MemberEvent) {
|
func (c *Client) nodeFail(me serf.MemberEvent) {
|
||||||
for _, m := range me.Members {
|
for _, m := range me.Members {
|
||||||
ok, parts := agent.IsConsulServer(m)
|
ok, parts := metadata.IsConsulServer(m)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -9,7 +9,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
"github.com/hashicorp/consul/agent/consul/agent"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
@ -440,7 +440,7 @@ func (s *Server) shouldHandleMember(member serf.Member) bool {
|
|||||||
if valid, dc := isConsulNode(member); valid && dc == s.config.Datacenter {
|
if valid, dc := isConsulNode(member); valid && dc == s.config.Datacenter {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if valid, parts := agent.IsConsulServer(member); valid && parts.Datacenter == s.config.Datacenter {
|
if valid, parts := metadata.IsConsulServer(member); valid && parts.Datacenter == s.config.Datacenter {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
@ -451,7 +451,7 @@ func (s *Server) shouldHandleMember(member serf.Member) bool {
|
|||||||
func (s *Server) handleAliveMember(member serf.Member) error {
|
func (s *Server) handleAliveMember(member serf.Member) error {
|
||||||
// Register consul service if a server
|
// Register consul service if a server
|
||||||
var service *structs.NodeService
|
var service *structs.NodeService
|
||||||
if valid, parts := agent.IsConsulServer(member); valid {
|
if valid, parts := metadata.IsConsulServer(member); valid {
|
||||||
service = &structs.NodeService{
|
service = &structs.NodeService{
|
||||||
ID: structs.ConsulServiceID,
|
ID: structs.ConsulServiceID,
|
||||||
Service: structs.ConsulServiceName,
|
Service: structs.ConsulServiceName,
|
||||||
@ -595,7 +595,7 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Remove from Raft peers if this was a server
|
// Remove from Raft peers if this was a server
|
||||||
if valid, parts := agent.IsConsulServer(member); valid {
|
if valid, parts := metadata.IsConsulServer(member); valid {
|
||||||
if err := s.removeConsulServer(member, parts.Port); err != nil {
|
if err := s.removeConsulServer(member, parts.Port); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -622,7 +622,7 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error
|
|||||||
}
|
}
|
||||||
|
|
||||||
// joinConsulServer is used to try to join another consul server
|
// joinConsulServer is used to try to join another consul server
|
||||||
func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {
|
func (s *Server) joinConsulServer(m serf.Member, parts *metadata.Server) error {
|
||||||
// Do not join ourself
|
// Do not join ourself
|
||||||
if m.Name == s.config.NodeName {
|
if m.Name == s.config.NodeName {
|
||||||
return nil
|
return nil
|
||||||
@ -632,7 +632,7 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {
|
|||||||
if parts.Bootstrap {
|
if parts.Bootstrap {
|
||||||
members := s.serfLAN.Members()
|
members := s.serfLAN.Members()
|
||||||
for _, member := range members {
|
for _, member := range members {
|
||||||
valid, p := agent.IsConsulServer(member)
|
valid, p := metadata.IsConsulServer(member)
|
||||||
if valid && member.Name != m.Name && p.Bootstrap {
|
if valid && member.Name != m.Name && p.Bootstrap {
|
||||||
s.logger.Printf("[ERR] consul: '%v' and '%v' are both in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer.", m.Name, member.Name)
|
s.logger.Printf("[ERR] consul: '%v' and '%v' are both in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer.", m.Name, member.Name)
|
||||||
return nil
|
return nil
|
||||||
@ -732,7 +732,7 @@ func (s *Server) removeConsulServer(m serf.Member, port int) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, parts := agent.IsConsulServer(m)
|
_, parts := metadata.IsConsulServer(m)
|
||||||
|
|
||||||
// Pick which remove API to use based on how the server was added.
|
// Pick which remove API to use based on how the server was added.
|
||||||
for _, server := range configFuture.Configuration().Servers {
|
for _, server := range configFuture.Configuration().Servers {
|
||||||
|
@ -3,7 +3,7 @@ package consul
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/agent"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
"github.com/hashicorp/serf/serf"
|
"github.com/hashicorp/serf/serf"
|
||||||
)
|
)
|
||||||
@ -48,7 +48,7 @@ func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
ok, parts := agent.IsConsulServer(*m)
|
ok, parts := metadata.IsConsulServer(*m)
|
||||||
if ok && parts.Datacenter != md.dc {
|
if ok && parts.Datacenter != md.dc {
|
||||||
return fmt.Errorf("Member '%s' part of wrong datacenter '%s'",
|
return fmt.Errorf("Member '%s' part of wrong datacenter '%s'",
|
||||||
m.Name, parts.Datacenter)
|
m.Name, parts.Datacenter)
|
||||||
@ -65,7 +65,7 @@ type wanMergeDelegate struct {
|
|||||||
|
|
||||||
func (md *wanMergeDelegate) NotifyMerge(members []*serf.Member) error {
|
func (md *wanMergeDelegate) NotifyMerge(members []*serf.Member) error {
|
||||||
for _, m := range members {
|
for _, m := range members {
|
||||||
ok, _ := agent.IsConsulServer(*m)
|
ok, _ := metadata.IsConsulServer(*m)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("Member '%s' is not a server", m.Name)
|
return fmt.Errorf("Member '%s' is not a server", m.Name)
|
||||||
}
|
}
|
||||||
|
@ -4,7 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/agent"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
"github.com/hashicorp/serf/serf"
|
"github.com/hashicorp/serf/serf"
|
||||||
@ -35,7 +35,7 @@ func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply
|
|||||||
// Index the Consul information about the servers.
|
// Index the Consul information about the servers.
|
||||||
serverMap := make(map[raft.ServerAddress]serf.Member)
|
serverMap := make(map[raft.ServerAddress]serf.Member)
|
||||||
for _, member := range op.srv.serfLAN.Members() {
|
for _, member := range op.srv.serfLAN.Members() {
|
||||||
valid, parts := agent.IsConsulServer(member)
|
valid, parts := metadata.IsConsulServer(member)
|
||||||
if !valid {
|
if !valid {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -9,12 +9,12 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
"github.com/hashicorp/consul/agent/consul/agent"
|
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/agent/pool"
|
"github.com/hashicorp/consul/agent/pool"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/lib"
|
"github.com/hashicorp/consul/lib"
|
||||||
"github.com/hashicorp/go-memdb"
|
memdb "github.com/hashicorp/go-memdb"
|
||||||
"github.com/hashicorp/memberlist"
|
"github.com/hashicorp/memberlist"
|
||||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||||
"github.com/hashicorp/yamux"
|
"github.com/hashicorp/yamux"
|
||||||
@ -225,7 +225,7 @@ CHECK_LEADER:
|
|||||||
// getLeader returns if the current node is the leader, and if not then it
|
// getLeader returns if the current node is the leader, and if not then it
|
||||||
// returns the leader which is potentially nil if the cluster has not yet
|
// returns the leader which is potentially nil if the cluster has not yet
|
||||||
// elected a leader.
|
// elected a leader.
|
||||||
func (s *Server) getLeader() (bool, *agent.Server) {
|
func (s *Server) getLeader() (bool, *metadata.Server) {
|
||||||
// Check if we are the leader
|
// Check if we are the leader
|
||||||
if s.IsLeader() {
|
if s.IsLeader() {
|
||||||
return true, nil
|
return true, nil
|
||||||
@ -247,7 +247,7 @@ func (s *Server) getLeader() (bool, *agent.Server) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// forwardLeader is used to forward an RPC call to the leader, or fail if no leader
|
// forwardLeader is used to forward an RPC call to the leader, or fail if no leader
|
||||||
func (s *Server) forwardLeader(server *agent.Server, method string, args interface{}, reply interface{}) error {
|
func (s *Server) forwardLeader(server *metadata.Server, method string, args interface{}, reply interface{}) error {
|
||||||
// Handle a missing server
|
// Handle a missing server
|
||||||
if server == nil {
|
if server == nil {
|
||||||
return structs.ErrNoLeader
|
return structs.ErrNoLeader
|
||||||
|
@ -4,7 +4,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/agent"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
"github.com/hashicorp/serf/serf"
|
"github.com/hashicorp/serf/serf"
|
||||||
)
|
)
|
||||||
@ -125,7 +125,7 @@ func (s *Server) localEvent(event serf.UserEvent) {
|
|||||||
// lanNodeJoin is used to handle join events on the LAN pool.
|
// lanNodeJoin is used to handle join events on the LAN pool.
|
||||||
func (s *Server) lanNodeJoin(me serf.MemberEvent) {
|
func (s *Server) lanNodeJoin(me serf.MemberEvent) {
|
||||||
for _, m := range me.Members {
|
for _, m := range me.Members {
|
||||||
ok, parts := agent.IsConsulServer(m)
|
ok, parts := metadata.IsConsulServer(m)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -166,9 +166,9 @@ func (s *Server) maybeBootstrap() {
|
|||||||
|
|
||||||
// Scan for all the known servers.
|
// Scan for all the known servers.
|
||||||
members := s.serfLAN.Members()
|
members := s.serfLAN.Members()
|
||||||
var servers []agent.Server
|
var servers []metadata.Server
|
||||||
for _, member := range members {
|
for _, member := range members {
|
||||||
valid, p := agent.IsConsulServer(member)
|
valid, p := metadata.IsConsulServer(member)
|
||||||
if !valid {
|
if !valid {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -265,7 +265,7 @@ func (s *Server) maybeBootstrap() {
|
|||||||
// lanNodeFailed is used to handle fail events on the LAN pool.
|
// lanNodeFailed is used to handle fail events on the LAN pool.
|
||||||
func (s *Server) lanNodeFailed(me serf.MemberEvent) {
|
func (s *Server) lanNodeFailed(me serf.MemberEvent) {
|
||||||
for _, m := range me.Members {
|
for _, m := range me.Members {
|
||||||
ok, parts := agent.IsConsulServer(m)
|
ok, parts := metadata.IsConsulServer(m)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -18,8 +18,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/consul/agent"
|
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/agent/pool"
|
"github.com/hashicorp/consul/agent/pool"
|
||||||
"github.com/hashicorp/consul/agent/router"
|
"github.com/hashicorp/consul/agent/router"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
@ -28,7 +28,7 @@ import (
|
|||||||
"github.com/hashicorp/consul/tlsutil"
|
"github.com/hashicorp/consul/tlsutil"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
"github.com/hashicorp/raft-boltdb"
|
raftboltdb "github.com/hashicorp/raft-boltdb"
|
||||||
"github.com/hashicorp/serf/coordinate"
|
"github.com/hashicorp/serf/coordinate"
|
||||||
"github.com/hashicorp/serf/serf"
|
"github.com/hashicorp/serf/serf"
|
||||||
)
|
)
|
||||||
@ -125,7 +125,7 @@ type Server struct {
|
|||||||
|
|
||||||
// localConsuls is used to track the known consuls
|
// localConsuls is used to track the known consuls
|
||||||
// in the local datacenter. Used to do leader forwarding.
|
// in the local datacenter. Used to do leader forwarding.
|
||||||
localConsuls map[raft.ServerAddress]*agent.Server
|
localConsuls map[raft.ServerAddress]*metadata.Server
|
||||||
localLock sync.RWMutex
|
localLock sync.RWMutex
|
||||||
|
|
||||||
// Logger uses the provided LogOutput
|
// Logger uses the provided LogOutput
|
||||||
@ -295,7 +295,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
|
|||||||
connPool: connPool,
|
connPool: connPool,
|
||||||
eventChLAN: make(chan serf.Event, 256),
|
eventChLAN: make(chan serf.Event, 256),
|
||||||
eventChWAN: make(chan serf.Event, 256),
|
eventChWAN: make(chan serf.Event, 256),
|
||||||
localConsuls: make(map[raft.ServerAddress]*agent.Server),
|
localConsuls: make(map[raft.ServerAddress]*metadata.Server),
|
||||||
logger: logger,
|
logger: logger,
|
||||||
reconcileCh: make(chan serf.Member, 32),
|
reconcileCh: make(chan serf.Member, 32),
|
||||||
router: router.NewRouter(logger, config.Datacenter),
|
router: router.NewRouter(logger, config.Datacenter),
|
||||||
@ -385,7 +385,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
|
|||||||
go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN)
|
go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN)
|
||||||
|
|
||||||
// Fire up the LAN <-> WAN join flooder.
|
// Fire up the LAN <-> WAN join flooder.
|
||||||
portFn := func(s *agent.Server) (int, bool) {
|
portFn := func(s *metadata.Server) (int, bool) {
|
||||||
if s.WanJoinPort > 0 {
|
if s.WanJoinPort > 0 {
|
||||||
return s.WanJoinPort, true
|
return s.WanJoinPort, true
|
||||||
}
|
}
|
||||||
|
@ -11,7 +11,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/agent"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/agent/token"
|
"github.com/hashicorp/consul/agent/token"
|
||||||
"github.com/hashicorp/consul/testrpc"
|
"github.com/hashicorp/consul/testrpc"
|
||||||
"github.com/hashicorp/consul/testutil"
|
"github.com/hashicorp/consul/testutil"
|
||||||
@ -667,7 +667,7 @@ func testVerifyRPC(s1, s2 *Server, t *testing.T) (bool, error) {
|
|||||||
|
|
||||||
// Have s2 make an RPC call to s1
|
// Have s2 make an RPC call to s1
|
||||||
s2.localLock.RLock()
|
s2.localLock.RLock()
|
||||||
var leader *agent.Server
|
var leader *metadata.Server
|
||||||
for _, server := range s2.localConsuls {
|
for _, server := range s2.localConsuls {
|
||||||
if server.Name == s1.config.NodeName {
|
if server.Name == s1.config.NodeName {
|
||||||
leader = server
|
leader = server
|
||||||
|
@ -5,7 +5,7 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/agent"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/agent/pool"
|
"github.com/hashicorp/consul/agent/pool"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
)
|
)
|
||||||
@ -39,7 +39,7 @@ func NewStatsFetcher(logger *log.Logger, pool *pool.ConnPool, datacenter string)
|
|||||||
// cancel this when the context is canceled because we only want one in-flight
|
// cancel this when the context is canceled because we only want one in-flight
|
||||||
// RPC to each server, so we let it finish and then clean up the in-flight
|
// RPC to each server, so we let it finish and then clean up the in-flight
|
||||||
// tracking.
|
// tracking.
|
||||||
func (f *StatsFetcher) fetch(server *agent.Server, replyCh chan *structs.ServerStats) {
|
func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *structs.ServerStats) {
|
||||||
var args struct{}
|
var args struct{}
|
||||||
var reply structs.ServerStats
|
var reply structs.ServerStats
|
||||||
err := f.pool.RPC(f.datacenter, server.Addr, server.Version, "Status.RaftStats", server.UseTLS, &args, &reply)
|
err := f.pool.RPC(f.datacenter, server.Addr, server.Version, "Status.RaftStats", server.UseTLS, &args, &reply)
|
||||||
@ -56,9 +56,9 @@ func (f *StatsFetcher) fetch(server *agent.Server, replyCh chan *structs.ServerS
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Fetch will attempt to query all the servers in parallel.
|
// Fetch will attempt to query all the servers in parallel.
|
||||||
func (f *StatsFetcher) Fetch(ctx context.Context, servers []*agent.Server) map[string]*structs.ServerStats {
|
func (f *StatsFetcher) Fetch(ctx context.Context, servers []*metadata.Server) map[string]*structs.ServerStats {
|
||||||
type workItem struct {
|
type workItem struct {
|
||||||
server *agent.Server
|
server *metadata.Server
|
||||||
replyCh chan *structs.ServerStats
|
replyCh chan *structs.ServerStats
|
||||||
}
|
}
|
||||||
var work []*workItem
|
var work []*workItem
|
||||||
|
@ -6,7 +6,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/agent"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/testrpc"
|
"github.com/hashicorp/consul/testrpc"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
)
|
)
|
||||||
@ -34,9 +34,9 @@ func TestStatsFetcher(t *testing.T) {
|
|||||||
t.Fatalf("bad len: %d", len(members))
|
t.Fatalf("bad len: %d", len(members))
|
||||||
}
|
}
|
||||||
|
|
||||||
var servers []*agent.Server
|
var servers []*metadata.Server
|
||||||
for _, member := range members {
|
for _, member := range members {
|
||||||
ok, server := agent.IsConsulServer(member)
|
ok, server := metadata.IsConsulServer(member)
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatalf("bad: %#v", member)
|
t.Fatalf("bad: %#v", member)
|
||||||
}
|
}
|
||||||
|
@ -7,7 +7,7 @@ import (
|
|||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/agent"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/go-version"
|
"github.com/hashicorp/go-version"
|
||||||
"github.com/hashicorp/serf/serf"
|
"github.com/hashicorp/serf/serf"
|
||||||
)
|
)
|
||||||
@ -302,7 +302,7 @@ func runtimeStats() map[string]string {
|
|||||||
// given Consul version
|
// given Consul version
|
||||||
func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Version) bool {
|
func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Version) bool {
|
||||||
for _, member := range members {
|
for _, member := range members {
|
||||||
if valid, parts := agent.IsConsulServer(member); valid && parts.Status == serf.StatusAlive {
|
if valid, parts := metadata.IsConsulServer(member); valid && parts.Status == serf.StatusAlive {
|
||||||
if parts.Build.LessThan(minVersion) {
|
if parts.Build.LessThan(minVersion) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,7 @@
|
|||||||
// communicate Consul server information. Gossiped information that ends up
|
// communicate Consul server information. Gossiped information that ends up
|
||||||
// in Server contains the necessary metadata required for servers.Manager to
|
// in Server contains the necessary metadata required for servers.Manager to
|
||||||
// select which server an RPC request should be routed to.
|
// select which server an RPC request should be routed to.
|
||||||
package agent
|
package metadata
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
@ -1,4 +1,4 @@
|
|||||||
package agent
|
package metadata
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
@ -1,10 +1,10 @@
|
|||||||
package agent_test
|
package metadata_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net"
|
"net"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/agent"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/serf/serf"
|
"github.com/hashicorp/serf/serf"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -14,19 +14,19 @@ func TestServer_Key_params(t *testing.T) {
|
|||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
sd1 *agent.Server
|
sd1 *metadata.Server
|
||||||
sd2 *agent.Server
|
sd2 *metadata.Server
|
||||||
equal bool
|
equal bool
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "Addr inequality",
|
name: "Addr inequality",
|
||||||
sd1: &agent.Server{
|
sd1: &metadata.Server{
|
||||||
Name: "s1",
|
Name: "s1",
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Port: 8300,
|
Port: 8300,
|
||||||
Addr: &net.IPAddr{IP: ipv4a},
|
Addr: &net.IPAddr{IP: ipv4a},
|
||||||
},
|
},
|
||||||
sd2: &agent.Server{
|
sd2: &metadata.Server{
|
||||||
Name: "s1",
|
Name: "s1",
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Port: 8300,
|
Port: 8300,
|
||||||
@ -42,7 +42,7 @@ func TestServer_Key_params(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Test Key to make sure it actually works as a key
|
// Test Key to make sure it actually works as a key
|
||||||
m := make(map[agent.Key]bool)
|
m := make(map[metadata.Key]bool)
|
||||||
m[*test.sd1.Key()] = true
|
m[*test.sd1.Key()] = true
|
||||||
if _, found := m[*test.sd2.Key()]; found != test.equal {
|
if _, found := m[*test.sd2.Key()]; found != test.equal {
|
||||||
t.Errorf("Expected a %v result from map test %s", test.equal, test.name)
|
t.Errorf("Expected a %v result from map test %s", test.equal, test.name)
|
||||||
@ -68,7 +68,7 @@ func TestIsConsulServer(t *testing.T) {
|
|||||||
},
|
},
|
||||||
Status: serf.StatusLeft,
|
Status: serf.StatusLeft,
|
||||||
}
|
}
|
||||||
ok, parts := agent.IsConsulServer(m)
|
ok, parts := metadata.IsConsulServer(m)
|
||||||
if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 {
|
if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 {
|
||||||
t.Fatalf("bad: %v %v", ok, parts)
|
t.Fatalf("bad: %v %v", ok, parts)
|
||||||
}
|
}
|
||||||
@ -101,7 +101,7 @@ func TestIsConsulServer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
m.Tags["bootstrap"] = "1"
|
m.Tags["bootstrap"] = "1"
|
||||||
m.Tags["disabled"] = "1"
|
m.Tags["disabled"] = "1"
|
||||||
ok, parts = agent.IsConsulServer(m)
|
ok, parts = metadata.IsConsulServer(m)
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatalf("expected a valid consul server")
|
t.Fatalf("expected a valid consul server")
|
||||||
}
|
}
|
||||||
@ -117,7 +117,7 @@ func TestIsConsulServer(t *testing.T) {
|
|||||||
m.Tags["expect"] = "3"
|
m.Tags["expect"] = "3"
|
||||||
delete(m.Tags, "bootstrap")
|
delete(m.Tags, "bootstrap")
|
||||||
delete(m.Tags, "disabled")
|
delete(m.Tags, "disabled")
|
||||||
ok, parts = agent.IsConsulServer(m)
|
ok, parts = metadata.IsConsulServer(m)
|
||||||
if !ok || parts.Expect != 3 {
|
if !ok || parts.Expect != 3 {
|
||||||
t.Fatalf("bad: %v", parts.Expect)
|
t.Fatalf("bad: %v", parts.Expect)
|
||||||
}
|
}
|
||||||
@ -126,7 +126,7 @@ func TestIsConsulServer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
delete(m.Tags, "role")
|
delete(m.Tags, "role")
|
||||||
ok, parts = agent.IsConsulServer(m)
|
ok, parts = metadata.IsConsulServer(m)
|
||||||
if ok {
|
if ok {
|
||||||
t.Fatalf("unexpected ok server")
|
t.Fatalf("unexpected ok server")
|
||||||
}
|
}
|
||||||
@ -147,7 +147,7 @@ func TestIsConsulServer_Optional(t *testing.T) {
|
|||||||
// should default to zero.
|
// should default to zero.
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
ok, parts := agent.IsConsulServer(m)
|
ok, parts := metadata.IsConsulServer(m)
|
||||||
if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 {
|
if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 {
|
||||||
t.Fatalf("bad: %v %v", ok, parts)
|
t.Fatalf("bad: %v %v", ok, parts)
|
||||||
}
|
}
|
||||||
@ -174,7 +174,7 @@ func TestIsConsulServer_Optional(t *testing.T) {
|
|||||||
}
|
}
|
||||||
m.Tags["bootstrap"] = "1"
|
m.Tags["bootstrap"] = "1"
|
||||||
m.Tags["disabled"] = "1"
|
m.Tags["disabled"] = "1"
|
||||||
ok, parts = agent.IsConsulServer(m)
|
ok, parts = metadata.IsConsulServer(m)
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatalf("expected a valid consul server")
|
t.Fatalf("expected a valid consul server")
|
||||||
}
|
}
|
||||||
@ -190,7 +190,7 @@ func TestIsConsulServer_Optional(t *testing.T) {
|
|||||||
m.Tags["expect"] = "3"
|
m.Tags["expect"] = "3"
|
||||||
delete(m.Tags, "bootstrap")
|
delete(m.Tags, "bootstrap")
|
||||||
delete(m.Tags, "disabled")
|
delete(m.Tags, "disabled")
|
||||||
ok, parts = agent.IsConsulServer(m)
|
ok, parts = metadata.IsConsulServer(m)
|
||||||
if !ok || parts.Expect != 3 {
|
if !ok || parts.Expect != 3 {
|
||||||
t.Fatalf("bad: %v", parts.Expect)
|
t.Fatalf("bad: %v", parts.Expect)
|
||||||
}
|
}
|
||||||
@ -199,7 +199,7 @@ func TestIsConsulServer_Optional(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
delete(m.Tags, "role")
|
delete(m.Tags, "role")
|
||||||
ok, parts = agent.IsConsulServer(m)
|
ok, parts = metadata.IsConsulServer(m)
|
||||||
if ok {
|
if ok {
|
||||||
t.Fatalf("unexpected ok server")
|
t.Fatalf("unexpected ok server")
|
||||||
}
|
}
|
@ -1,5 +1,5 @@
|
|||||||
// Package servers provides a Manager interface for Manager managed
|
// Package servers provides a Manager interface for Manager managed
|
||||||
// agent.Server objects. The servers package manages servers from a Consul
|
// metadata.Server objects. The servers package manages servers from a Consul
|
||||||
// client's perspective (i.e. a list of servers that a client talks with for
|
// client's perspective (i.e. a list of servers that a client talks with for
|
||||||
// RPCs). The servers package does not provide any API guarantees and should
|
// RPCs). The servers package does not provide any API guarantees and should
|
||||||
// be called only by `hashicorp/consul`.
|
// be called only by `hashicorp/consul`.
|
||||||
@ -13,7 +13,7 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/agent"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/lib"
|
"github.com/hashicorp/consul/lib"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -71,7 +71,7 @@ type Pinger interface {
|
|||||||
type serverList struct {
|
type serverList struct {
|
||||||
// servers tracks the locally known servers. List membership is
|
// servers tracks the locally known servers. List membership is
|
||||||
// maintained by Serf.
|
// maintained by Serf.
|
||||||
servers []*agent.Server
|
servers []*metadata.Server
|
||||||
}
|
}
|
||||||
|
|
||||||
type Manager struct {
|
type Manager struct {
|
||||||
@ -111,7 +111,7 @@ type Manager struct {
|
|||||||
// begin seeing use after the rebalance timer fires or enough servers fail
|
// begin seeing use after the rebalance timer fires or enough servers fail
|
||||||
// organically. If the server is already known, merge the new server
|
// organically. If the server is already known, merge the new server
|
||||||
// details.
|
// details.
|
||||||
func (m *Manager) AddServer(s *agent.Server) {
|
func (m *Manager) AddServer(s *metadata.Server) {
|
||||||
m.listLock.Lock()
|
m.listLock.Lock()
|
||||||
defer m.listLock.Unlock()
|
defer m.listLock.Unlock()
|
||||||
l := m.getServerList()
|
l := m.getServerList()
|
||||||
@ -120,7 +120,7 @@ func (m *Manager) AddServer(s *agent.Server) {
|
|||||||
found := false
|
found := false
|
||||||
for idx, existing := range l.servers {
|
for idx, existing := range l.servers {
|
||||||
if existing.Name == s.Name {
|
if existing.Name == s.Name {
|
||||||
newServers := make([]*agent.Server, len(l.servers))
|
newServers := make([]*metadata.Server, len(l.servers))
|
||||||
copy(newServers, l.servers)
|
copy(newServers, l.servers)
|
||||||
|
|
||||||
// Overwrite the existing server details in order to
|
// Overwrite the existing server details in order to
|
||||||
@ -135,7 +135,7 @@ func (m *Manager) AddServer(s *agent.Server) {
|
|||||||
|
|
||||||
// Add to the list if not known
|
// Add to the list if not known
|
||||||
if !found {
|
if !found {
|
||||||
newServers := make([]*agent.Server, len(l.servers), len(l.servers)+1)
|
newServers := make([]*metadata.Server, len(l.servers), len(l.servers)+1)
|
||||||
copy(newServers, l.servers)
|
copy(newServers, l.servers)
|
||||||
newServers = append(newServers, s)
|
newServers = append(newServers, s)
|
||||||
l.servers = newServers
|
l.servers = newServers
|
||||||
@ -156,13 +156,13 @@ func (m *Manager) AddServer(s *agent.Server) {
|
|||||||
// less desirable than just returning the next server in the firing line. If
|
// less desirable than just returning the next server in the firing line. If
|
||||||
// the next server fails, it will fail fast enough and cycleServer will be
|
// the next server fails, it will fail fast enough and cycleServer will be
|
||||||
// called again.
|
// called again.
|
||||||
func (l *serverList) cycleServer() (servers []*agent.Server) {
|
func (l *serverList) cycleServer() (servers []*metadata.Server) {
|
||||||
numServers := len(l.servers)
|
numServers := len(l.servers)
|
||||||
if numServers < 2 {
|
if numServers < 2 {
|
||||||
return servers // No action required
|
return servers // No action required
|
||||||
}
|
}
|
||||||
|
|
||||||
newServers := make([]*agent.Server, 0, numServers)
|
newServers := make([]*metadata.Server, 0, numServers)
|
||||||
newServers = append(newServers, l.servers[1:]...)
|
newServers = append(newServers, l.servers[1:]...)
|
||||||
newServers = append(newServers, l.servers[0])
|
newServers = append(newServers, l.servers[0])
|
||||||
|
|
||||||
@ -170,7 +170,7 @@ func (l *serverList) cycleServer() (servers []*agent.Server) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// removeServerByKey performs an inline removal of the first matching server
|
// removeServerByKey performs an inline removal of the first matching server
|
||||||
func (l *serverList) removeServerByKey(targetKey *agent.Key) {
|
func (l *serverList) removeServerByKey(targetKey *metadata.Key) {
|
||||||
for i, s := range l.servers {
|
for i, s := range l.servers {
|
||||||
if targetKey.Equal(s.Key()) {
|
if targetKey.Equal(s.Key()) {
|
||||||
copy(l.servers[i:], l.servers[i+1:])
|
copy(l.servers[i:], l.servers[i+1:])
|
||||||
@ -202,7 +202,7 @@ func (m *Manager) IsOffline() bool {
|
|||||||
// server list. If the server at the front of the list has failed or fails
|
// server list. If the server at the front of the list has failed or fails
|
||||||
// during an RPC call, it is rotated to the end of the list. If there are no
|
// during an RPC call, it is rotated to the end of the list. If there are no
|
||||||
// servers available, return nil.
|
// servers available, return nil.
|
||||||
func (m *Manager) FindServer() *agent.Server {
|
func (m *Manager) FindServer() *metadata.Server {
|
||||||
l := m.getServerList()
|
l := m.getServerList()
|
||||||
numServers := len(l.servers)
|
numServers := len(l.servers)
|
||||||
if numServers == 0 {
|
if numServers == 0 {
|
||||||
@ -249,14 +249,14 @@ func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfCl
|
|||||||
atomic.StoreInt32(&m.offline, 1)
|
atomic.StoreInt32(&m.offline, 1)
|
||||||
|
|
||||||
l := serverList{}
|
l := serverList{}
|
||||||
l.servers = make([]*agent.Server, 0)
|
l.servers = make([]*metadata.Server, 0)
|
||||||
m.saveServerList(l)
|
m.saveServerList(l)
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
// NotifyFailedServer marks the passed in server as "failed" by rotating it
|
// NotifyFailedServer marks the passed in server as "failed" by rotating it
|
||||||
// to the end of the server list.
|
// to the end of the server list.
|
||||||
func (m *Manager) NotifyFailedServer(s *agent.Server) {
|
func (m *Manager) NotifyFailedServer(s *metadata.Server) {
|
||||||
l := m.getServerList()
|
l := m.getServerList()
|
||||||
|
|
||||||
// If the server being failed is not the first server on the list,
|
// If the server being failed is not the first server on the list,
|
||||||
@ -290,7 +290,7 @@ func (m *Manager) NumServers() int {
|
|||||||
return len(l.servers)
|
return len(l.servers)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RebalanceServers shuffles the list of servers on this agent. The server
|
// RebalanceServers shuffles the list of servers on this metadata. The server
|
||||||
// at the front of the list is selected for the next RPC. RPC calls that
|
// at the front of the list is selected for the next RPC. RPC calls that
|
||||||
// fail for a particular server are rotated to the end of the list. This
|
// fail for a particular server are rotated to the end of the list. This
|
||||||
// method reshuffles the list periodically in order to redistribute work
|
// method reshuffles the list periodically in order to redistribute work
|
||||||
@ -376,14 +376,14 @@ func (m *Manager) reconcileServerList(l *serverList) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type targetServer struct {
|
type targetServer struct {
|
||||||
server *agent.Server
|
server *metadata.Server
|
||||||
|
|
||||||
// 'b' == both
|
// 'b' == both
|
||||||
// 'o' == original
|
// 'o' == original
|
||||||
// 'n' == new
|
// 'n' == new
|
||||||
state byte
|
state byte
|
||||||
}
|
}
|
||||||
mergedList := make(map[agent.Key]*targetServer, len(l.servers))
|
mergedList := make(map[metadata.Key]*targetServer, len(l.servers))
|
||||||
for _, s := range l.servers {
|
for _, s := range l.servers {
|
||||||
mergedList[*s.Key()] = &targetServer{server: s, state: 'o'}
|
mergedList[*s.Key()] = &targetServer{server: s, state: 'o'}
|
||||||
}
|
}
|
||||||
@ -425,7 +425,7 @@ func (m *Manager) reconcileServerList(l *serverList) bool {
|
|||||||
|
|
||||||
// RemoveServer takes out an internal write lock and removes a server from
|
// RemoveServer takes out an internal write lock and removes a server from
|
||||||
// the server list.
|
// the server list.
|
||||||
func (m *Manager) RemoveServer(s *agent.Server) {
|
func (m *Manager) RemoveServer(s *metadata.Server) {
|
||||||
m.listLock.Lock()
|
m.listLock.Lock()
|
||||||
defer m.listLock.Unlock()
|
defer m.listLock.Unlock()
|
||||||
l := m.getServerList()
|
l := m.getServerList()
|
||||||
@ -433,7 +433,7 @@ func (m *Manager) RemoveServer(s *agent.Server) {
|
|||||||
// Remove the server if known
|
// Remove the server if known
|
||||||
for i := range l.servers {
|
for i := range l.servers {
|
||||||
if l.servers[i].Name == s.Name {
|
if l.servers[i].Name == s.Name {
|
||||||
newServers := make([]*agent.Server, 0, len(l.servers)-1)
|
newServers := make([]*metadata.Server, 0, len(l.servers)-1)
|
||||||
newServers = append(newServers, l.servers[:i]...)
|
newServers = append(newServers, l.servers[:i]...)
|
||||||
newServers = append(newServers, l.servers[i+1:]...)
|
newServers = append(newServers, l.servers[i+1:]...)
|
||||||
l.servers = newServers
|
l.servers = newServers
|
||||||
|
@ -10,7 +10,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/agent"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -64,14 +64,14 @@ func testManagerFailProb(failPct float64) (m *Manager) {
|
|||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
// func (l *serverList) cycleServer() (servers []*agent.Server) {
|
// func (l *serverList) cycleServer() (servers []*metadata.Server) {
|
||||||
func TestManagerInternal_cycleServer(t *testing.T) {
|
func TestManagerInternal_cycleServer(t *testing.T) {
|
||||||
m := testManager()
|
m := testManager()
|
||||||
l := m.getServerList()
|
l := m.getServerList()
|
||||||
|
|
||||||
server0 := &agent.Server{Name: "server1"}
|
server0 := &metadata.Server{Name: "server1"}
|
||||||
server1 := &agent.Server{Name: "server2"}
|
server1 := &metadata.Server{Name: "server2"}
|
||||||
server2 := &agent.Server{Name: "server3"}
|
server2 := &metadata.Server{Name: "server3"}
|
||||||
l.servers = append(l.servers, server0, server1, server2)
|
l.servers = append(l.servers, server0, server1, server2)
|
||||||
m.saveServerList(l)
|
m.saveServerList(l)
|
||||||
|
|
||||||
@ -167,11 +167,11 @@ func test_reconcileServerList(maxServers int) (bool, error) {
|
|||||||
const failPct = 0.5
|
const failPct = 0.5
|
||||||
m := testManagerFailProb(failPct)
|
m := testManagerFailProb(failPct)
|
||||||
|
|
||||||
var failedServers, healthyServers []*agent.Server
|
var failedServers, healthyServers []*metadata.Server
|
||||||
for i := 0; i < maxServers; i++ {
|
for i := 0; i < maxServers; i++ {
|
||||||
nodeName := fmt.Sprintf("s%02d", i)
|
nodeName := fmt.Sprintf("s%02d", i)
|
||||||
|
|
||||||
node := &agent.Server{Name: nodeName}
|
node := &metadata.Server{Name: nodeName}
|
||||||
// Add 66% of servers to Manager
|
// Add 66% of servers to Manager
|
||||||
if rand.Float64() > 0.33 {
|
if rand.Float64() > 0.33 {
|
||||||
m.AddServer(node)
|
m.AddServer(node)
|
||||||
@ -231,7 +231,7 @@ func test_reconcileServerList(maxServers int) (bool, error) {
|
|||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
resultingServerMap := make(map[agent.Key]bool)
|
resultingServerMap := make(map[metadata.Key]bool)
|
||||||
for _, s := range m.getServerList().servers {
|
for _, s := range m.getServerList().servers {
|
||||||
resultingServerMap[*s.Key()] = true
|
resultingServerMap[*s.Key()] = true
|
||||||
}
|
}
|
||||||
@ -303,7 +303,7 @@ func TestManagerInternal_refreshServerRebalanceTimer(t *testing.T) {
|
|||||||
m := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}, &fauxConnPool{})
|
m := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}, &fauxConnPool{})
|
||||||
for i := 0; i < s.numServers; i++ {
|
for i := 0; i < s.numServers; i++ {
|
||||||
nodeName := fmt.Sprintf("s%02d", i)
|
nodeName := fmt.Sprintf("s%02d", i)
|
||||||
m.AddServer(&agent.Server{Name: nodeName})
|
m.AddServer(&metadata.Server{Name: nodeName})
|
||||||
}
|
}
|
||||||
|
|
||||||
d := m.refreshServerRebalanceTimer()
|
d := m.refreshServerRebalanceTimer()
|
||||||
@ -324,7 +324,7 @@ func TestManagerInternal_saveServerList(t *testing.T) {
|
|||||||
t.Fatalf("Manager.saveServerList failed to load init config")
|
t.Fatalf("Manager.saveServerList failed to load init config")
|
||||||
}
|
}
|
||||||
|
|
||||||
newServer := new(agent.Server)
|
newServer := new(metadata.Server)
|
||||||
l.servers = append(l.servers, newServer)
|
l.servers = append(l.servers, newServer)
|
||||||
m.saveServerList(l)
|
m.saveServerList(l)
|
||||||
}()
|
}()
|
||||||
@ -340,7 +340,7 @@ func TestManagerInternal_saveServerList(t *testing.T) {
|
|||||||
|
|
||||||
// Verify mutation w/o a save doesn't alter the original
|
// Verify mutation w/o a save doesn't alter the original
|
||||||
func() {
|
func() {
|
||||||
newServer := new(agent.Server)
|
newServer := new(metadata.Server)
|
||||||
l := m.getServerList()
|
l := m.getServerList()
|
||||||
l.servers = append(l.servers, newServer)
|
l.servers = append(l.servers, newServer)
|
||||||
|
|
||||||
|
@ -9,7 +9,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/agent"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/agent/router"
|
"github.com/hashicorp/consul/agent/router"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -48,7 +48,7 @@ func testManagerFailProb(failPct float64) (m *router.Manager) {
|
|||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
// func (m *Manager) AddServer(server *agent.Server) {
|
// func (m *Manager) AddServer(server *metadata.Server) {
|
||||||
func TestServers_AddServer(t *testing.T) {
|
func TestServers_AddServer(t *testing.T) {
|
||||||
m := testManager()
|
m := testManager()
|
||||||
var num int
|
var num int
|
||||||
@ -57,7 +57,7 @@ func TestServers_AddServer(t *testing.T) {
|
|||||||
t.Fatalf("Expected zero servers to start")
|
t.Fatalf("Expected zero servers to start")
|
||||||
}
|
}
|
||||||
|
|
||||||
s1 := &agent.Server{Name: "s1"}
|
s1 := &metadata.Server{Name: "s1"}
|
||||||
m.AddServer(s1)
|
m.AddServer(s1)
|
||||||
num = m.NumServers()
|
num = m.NumServers()
|
||||||
if num != 1 {
|
if num != 1 {
|
||||||
@ -70,7 +70,7 @@ func TestServers_AddServer(t *testing.T) {
|
|||||||
t.Fatalf("Expected one server (still)")
|
t.Fatalf("Expected one server (still)")
|
||||||
}
|
}
|
||||||
|
|
||||||
s2 := &agent.Server{Name: "s2"}
|
s2 := &metadata.Server{Name: "s2"}
|
||||||
m.AddServer(s2)
|
m.AddServer(s2)
|
||||||
num = m.NumServers()
|
num = m.NumServers()
|
||||||
if num != 2 {
|
if num != 2 {
|
||||||
@ -85,7 +85,7 @@ func TestServers_IsOffline(t *testing.T) {
|
|||||||
t.Fatalf("bad")
|
t.Fatalf("bad")
|
||||||
}
|
}
|
||||||
|
|
||||||
s1 := &agent.Server{Name: "s1"}
|
s1 := &metadata.Server{Name: "s1"}
|
||||||
m.AddServer(s1)
|
m.AddServer(s1)
|
||||||
if m.IsOffline() {
|
if m.IsOffline() {
|
||||||
t.Fatalf("bad")
|
t.Fatalf("bad")
|
||||||
@ -117,7 +117,7 @@ func TestServers_IsOffline(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// func (m *Manager) FindServer() (server *agent.Server) {
|
// func (m *Manager) FindServer() (server *metadata.Server) {
|
||||||
func TestServers_FindServer(t *testing.T) {
|
func TestServers_FindServer(t *testing.T) {
|
||||||
m := testManager()
|
m := testManager()
|
||||||
|
|
||||||
@ -125,7 +125,7 @@ func TestServers_FindServer(t *testing.T) {
|
|||||||
t.Fatalf("Expected nil return")
|
t.Fatalf("Expected nil return")
|
||||||
}
|
}
|
||||||
|
|
||||||
m.AddServer(&agent.Server{Name: "s1"})
|
m.AddServer(&metadata.Server{Name: "s1"})
|
||||||
if m.NumServers() != 1 {
|
if m.NumServers() != 1 {
|
||||||
t.Fatalf("Expected one server")
|
t.Fatalf("Expected one server")
|
||||||
}
|
}
|
||||||
@ -143,7 +143,7 @@ func TestServers_FindServer(t *testing.T) {
|
|||||||
t.Fatalf("Expected s1 server (still)")
|
t.Fatalf("Expected s1 server (still)")
|
||||||
}
|
}
|
||||||
|
|
||||||
m.AddServer(&agent.Server{Name: "s2"})
|
m.AddServer(&metadata.Server{Name: "s2"})
|
||||||
if m.NumServers() != 2 {
|
if m.NumServers() != 2 {
|
||||||
t.Fatalf("Expected two servers")
|
t.Fatalf("Expected two servers")
|
||||||
}
|
}
|
||||||
@ -175,7 +175,7 @@ func TestServers_New(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// func (m *Manager) NotifyFailedServer(server *agent.Server) {
|
// func (m *Manager) NotifyFailedServer(server *metadata.Server) {
|
||||||
func TestServers_NotifyFailedServer(t *testing.T) {
|
func TestServers_NotifyFailedServer(t *testing.T) {
|
||||||
m := testManager()
|
m := testManager()
|
||||||
|
|
||||||
@ -183,8 +183,8 @@ func TestServers_NotifyFailedServer(t *testing.T) {
|
|||||||
t.Fatalf("Expected zero servers to start")
|
t.Fatalf("Expected zero servers to start")
|
||||||
}
|
}
|
||||||
|
|
||||||
s1 := &agent.Server{Name: "s1"}
|
s1 := &metadata.Server{Name: "s1"}
|
||||||
s2 := &agent.Server{Name: "s2"}
|
s2 := &metadata.Server{Name: "s2"}
|
||||||
|
|
||||||
// Try notifying for a server that is not managed by Manager
|
// Try notifying for a server that is not managed by Manager
|
||||||
m.NotifyFailedServer(s1)
|
m.NotifyFailedServer(s1)
|
||||||
@ -237,7 +237,7 @@ func TestServers_NumServers(t *testing.T) {
|
|||||||
t.Fatalf("Expected zero servers to start")
|
t.Fatalf("Expected zero servers to start")
|
||||||
}
|
}
|
||||||
|
|
||||||
s := &agent.Server{}
|
s := &metadata.Server{}
|
||||||
m.AddServer(s)
|
m.AddServer(s)
|
||||||
num = m.NumServers()
|
num = m.NumServers()
|
||||||
if num != 1 {
|
if num != 1 {
|
||||||
@ -256,7 +256,7 @@ func TestServers_RebalanceServers(t *testing.T) {
|
|||||||
// Make a huge list of nodes.
|
// Make a huge list of nodes.
|
||||||
for i := 0; i < maxServers; i++ {
|
for i := 0; i < maxServers; i++ {
|
||||||
nodeName := fmt.Sprintf("s%02d", i)
|
nodeName := fmt.Sprintf("s%02d", i)
|
||||||
m.AddServer(&agent.Server{Name: nodeName})
|
m.AddServer(&metadata.Server{Name: nodeName})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Keep track of how many unique shuffles we get.
|
// Keep track of how many unique shuffles we get.
|
||||||
@ -282,7 +282,7 @@ func TestServers_RebalanceServers(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// func (m *Manager) RemoveServer(server *agent.Server) {
|
// func (m *Manager) RemoveServer(server *metadata.Server) {
|
||||||
func TestManager_RemoveServer(t *testing.T) {
|
func TestManager_RemoveServer(t *testing.T) {
|
||||||
const nodeNameFmt = "s%02d"
|
const nodeNameFmt = "s%02d"
|
||||||
m := testManager()
|
m := testManager()
|
||||||
@ -293,21 +293,21 @@ func TestManager_RemoveServer(t *testing.T) {
|
|||||||
|
|
||||||
// Test removing server before its added
|
// Test removing server before its added
|
||||||
nodeName := fmt.Sprintf(nodeNameFmt, 1)
|
nodeName := fmt.Sprintf(nodeNameFmt, 1)
|
||||||
s1 := &agent.Server{Name: nodeName}
|
s1 := &metadata.Server{Name: nodeName}
|
||||||
m.RemoveServer(s1)
|
m.RemoveServer(s1)
|
||||||
m.AddServer(s1)
|
m.AddServer(s1)
|
||||||
|
|
||||||
nodeName = fmt.Sprintf(nodeNameFmt, 2)
|
nodeName = fmt.Sprintf(nodeNameFmt, 2)
|
||||||
s2 := &agent.Server{Name: nodeName}
|
s2 := &metadata.Server{Name: nodeName}
|
||||||
m.RemoveServer(s2)
|
m.RemoveServer(s2)
|
||||||
m.AddServer(s2)
|
m.AddServer(s2)
|
||||||
|
|
||||||
const maxServers = 19
|
const maxServers = 19
|
||||||
servers := make([]*agent.Server, maxServers)
|
servers := make([]*metadata.Server, maxServers)
|
||||||
// Already added two servers above
|
// Already added two servers above
|
||||||
for i := maxServers; i > 2; i-- {
|
for i := maxServers; i > 2; i-- {
|
||||||
nodeName := fmt.Sprintf(nodeNameFmt, i)
|
nodeName := fmt.Sprintf(nodeNameFmt, i)
|
||||||
server := &agent.Server{Name: nodeName}
|
server := &metadata.Server{Name: nodeName}
|
||||||
servers = append(servers, server)
|
servers = append(servers, server)
|
||||||
m.AddServer(server)
|
m.AddServer(server)
|
||||||
}
|
}
|
||||||
@ -321,7 +321,7 @@ func TestManager_RemoveServer(t *testing.T) {
|
|||||||
t.Fatalf("Expected %d servers, received %d", maxServers, m.NumServers())
|
t.Fatalf("Expected %d servers, received %d", maxServers, m.NumServers())
|
||||||
}
|
}
|
||||||
|
|
||||||
findServer := func(server *agent.Server) bool {
|
findServer := func(server *metadata.Server) bool {
|
||||||
for i := m.NumServers(); i > 0; i-- {
|
for i := m.NumServers(); i > 0; i-- {
|
||||||
s := m.FindServer()
|
s := m.FindServer()
|
||||||
if s == server {
|
if s == server {
|
||||||
@ -332,7 +332,7 @@ func TestManager_RemoveServer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
expectedNumServers := maxServers
|
expectedNumServers := maxServers
|
||||||
removedServers := make([]*agent.Server, 0, maxServers)
|
removedServers := make([]*metadata.Server, 0, maxServers)
|
||||||
|
|
||||||
// Remove servers from the front of the list
|
// Remove servers from the front of the list
|
||||||
for i := 3; i > 0; i-- {
|
for i := 3; i > 0; i-- {
|
||||||
|
@ -6,7 +6,7 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/agent"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/lib"
|
"github.com/hashicorp/consul/lib"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
@ -34,7 +34,7 @@ type Router struct {
|
|||||||
managers map[string][]*Manager
|
managers map[string][]*Manager
|
||||||
|
|
||||||
// routeFn is a hook to actually do the routing.
|
// routeFn is a hook to actually do the routing.
|
||||||
routeFn func(datacenter string) (*Manager, *agent.Server, bool)
|
routeFn func(datacenter string) (*Manager, *metadata.Server, bool)
|
||||||
|
|
||||||
// isShutdown prevents adding new routes to a router after it is shut
|
// isShutdown prevents adding new routes to a router after it is shut
|
||||||
// down.
|
// down.
|
||||||
@ -140,7 +140,7 @@ func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger
|
|||||||
// initially, and then will quickly detect that they are failed if we
|
// initially, and then will quickly detect that they are failed if we
|
||||||
// can't reach them.
|
// can't reach them.
|
||||||
for _, m := range cluster.Members() {
|
for _, m := range cluster.Members() {
|
||||||
ok, parts := agent.IsConsulServer(m)
|
ok, parts := metadata.IsConsulServer(m)
|
||||||
if !ok {
|
if !ok {
|
||||||
r.logger.Printf("[WARN]: consul: Non-server %q in server-only area %q",
|
r.logger.Printf("[WARN]: consul: Non-server %q in server-only area %q",
|
||||||
m.Name, areaID)
|
m.Name, areaID)
|
||||||
@ -206,7 +206,7 @@ func (r *Router) RemoveArea(areaID types.AreaID) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// addServer does the work of AddServer once the write lock is held.
|
// addServer does the work of AddServer once the write lock is held.
|
||||||
func (r *Router) addServer(area *areaInfo, s *agent.Server) error {
|
func (r *Router) addServer(area *areaInfo, s *metadata.Server) error {
|
||||||
// Make the manager on the fly if this is the first we've seen of it,
|
// Make the manager on the fly if this is the first we've seen of it,
|
||||||
// and add it to the index.
|
// and add it to the index.
|
||||||
info, ok := area.managers[s.Datacenter]
|
info, ok := area.managers[s.Datacenter]
|
||||||
@ -236,7 +236,7 @@ func (r *Router) addServer(area *areaInfo, s *agent.Server) error {
|
|||||||
|
|
||||||
// AddServer should be called whenever a new server joins an area. This is
|
// AddServer should be called whenever a new server joins an area. This is
|
||||||
// typically hooked into the Serf event handler area for this area.
|
// typically hooked into the Serf event handler area for this area.
|
||||||
func (r *Router) AddServer(areaID types.AreaID, s *agent.Server) error {
|
func (r *Router) AddServer(areaID types.AreaID, s *metadata.Server) error {
|
||||||
r.Lock()
|
r.Lock()
|
||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
|
|
||||||
@ -249,7 +249,7 @@ func (r *Router) AddServer(areaID types.AreaID, s *agent.Server) error {
|
|||||||
|
|
||||||
// RemoveServer should be called whenever a server is removed from an area. This
|
// RemoveServer should be called whenever a server is removed from an area. This
|
||||||
// is typically hooked into the Serf event handler area for this area.
|
// is typically hooked into the Serf event handler area for this area.
|
||||||
func (r *Router) RemoveServer(areaID types.AreaID, s *agent.Server) error {
|
func (r *Router) RemoveServer(areaID types.AreaID, s *metadata.Server) error {
|
||||||
r.Lock()
|
r.Lock()
|
||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
|
|
||||||
@ -282,7 +282,7 @@ func (r *Router) RemoveServer(areaID types.AreaID, s *agent.Server) error {
|
|||||||
// is typically hooked into the Serf event handler area for this area. We will
|
// is typically hooked into the Serf event handler area for this area. We will
|
||||||
// immediately shift traffic away from this server, but it will remain in the
|
// immediately shift traffic away from this server, but it will remain in the
|
||||||
// list of servers.
|
// list of servers.
|
||||||
func (r *Router) FailServer(areaID types.AreaID, s *agent.Server) error {
|
func (r *Router) FailServer(areaID types.AreaID, s *metadata.Server) error {
|
||||||
r.RLock()
|
r.RLock()
|
||||||
defer r.RUnlock()
|
defer r.RUnlock()
|
||||||
|
|
||||||
@ -309,13 +309,13 @@ func (r *Router) FailServer(areaID types.AreaID, s *agent.Server) error {
|
|||||||
// connection attempt. If any problem occurs with the given server, the caller
|
// connection attempt. If any problem occurs with the given server, the caller
|
||||||
// should feed that back to the manager associated with the server, which is
|
// should feed that back to the manager associated with the server, which is
|
||||||
// also returned, by calling NofifyFailedServer().
|
// also returned, by calling NofifyFailedServer().
|
||||||
func (r *Router) FindRoute(datacenter string) (*Manager, *agent.Server, bool) {
|
func (r *Router) FindRoute(datacenter string) (*Manager, *metadata.Server, bool) {
|
||||||
return r.routeFn(datacenter)
|
return r.routeFn(datacenter)
|
||||||
}
|
}
|
||||||
|
|
||||||
// findDirectRoute looks for a route to the given datacenter if it's directly
|
// findDirectRoute looks for a route to the given datacenter if it's directly
|
||||||
// adjacent to the server.
|
// adjacent to the server.
|
||||||
func (r *Router) findDirectRoute(datacenter string) (*Manager, *agent.Server, bool) {
|
func (r *Router) findDirectRoute(datacenter string) (*Manager, *metadata.Server, bool) {
|
||||||
r.RLock()
|
r.RLock()
|
||||||
defer r.RUnlock()
|
defer r.RUnlock()
|
||||||
|
|
||||||
@ -399,7 +399,7 @@ func (r *Router) GetDatacentersByDistance() ([]string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, m := range info.cluster.Members() {
|
for _, m := range info.cluster.Members() {
|
||||||
ok, parts := agent.IsConsulServer(m)
|
ok, parts := metadata.IsConsulServer(m)
|
||||||
if !ok {
|
if !ok {
|
||||||
r.logger.Printf("[WARN]: consul: Non-server %q in server-only area %q",
|
r.logger.Printf("[WARN]: consul: Non-server %q in server-only area %q",
|
||||||
m.Name, areaID)
|
m.Name, areaID)
|
||||||
@ -460,7 +460,7 @@ func (r *Router) GetDatacenterMaps() ([]structs.DatacenterMap, error) {
|
|||||||
for areaID, info := range r.areas {
|
for areaID, info := range r.areas {
|
||||||
index := make(map[string]structs.Coordinates)
|
index := make(map[string]structs.Coordinates)
|
||||||
for _, m := range info.cluster.Members() {
|
for _, m := range info.cluster.Members() {
|
||||||
ok, parts := agent.IsConsulServer(m)
|
ok, parts := metadata.IsConsulServer(m)
|
||||||
if !ok {
|
if !ok {
|
||||||
r.logger.Printf("[WARN]: consul: Non-server %q in server-only area %q",
|
r.logger.Printf("[WARN]: consul: Non-server %q in server-only area %q",
|
||||||
m.Name, areaID)
|
m.Name, areaID)
|
||||||
|
@ -3,13 +3,13 @@ package router
|
|||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/agent"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
"github.com/hashicorp/serf/serf"
|
"github.com/hashicorp/serf/serf"
|
||||||
)
|
)
|
||||||
|
|
||||||
// routerFn selects one of the router operations to map to incoming Serf events.
|
// routerFn selects one of the router operations to map to incoming Serf events.
|
||||||
type routerFn func(types.AreaID, *agent.Server) error
|
type routerFn func(types.AreaID, *metadata.Server) error
|
||||||
|
|
||||||
// handleMemberEvents attempts to apply the given Serf member event to the given
|
// handleMemberEvents attempts to apply the given Serf member event to the given
|
||||||
// router function.
|
// router function.
|
||||||
@ -21,7 +21,7 @@ func handleMemberEvent(logger *log.Logger, fn routerFn, areaID types.AreaID, e s
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, m := range me.Members {
|
for _, m := range me.Members {
|
||||||
ok, parts := agent.IsConsulServer(m)
|
ok, parts := metadata.IsConsulServer(m)
|
||||||
if !ok {
|
if !ok {
|
||||||
logger.Printf("[WARN]: consul: Non-server %q in server-only area %q",
|
logger.Printf("[WARN]: consul: Non-server %q in server-only area %q",
|
||||||
m.Name, areaID)
|
m.Name, areaID)
|
||||||
|
@ -6,13 +6,13 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/agent"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/serf/serf"
|
"github.com/hashicorp/serf/serf"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FloodPortFn gets the port to use for a given server when flood-joining. This
|
// FloodPortFn gets the port to use for a given server when flood-joining. This
|
||||||
// will return false if it doesn't have one.
|
// will return false if it doesn't have one.
|
||||||
type FloodPortFn func(*agent.Server) (int, bool)
|
type FloodPortFn func(*metadata.Server) (int, bool)
|
||||||
|
|
||||||
// FloodJoins attempts to make sure all Consul servers in the local Serf
|
// FloodJoins attempts to make sure all Consul servers in the local Serf
|
||||||
// instance are joined in the global Serf instance. It assumes names in the
|
// instance are joined in the global Serf instance. It assumes names in the
|
||||||
@ -27,9 +27,9 @@ func FloodJoins(logger *log.Logger, portFn FloodPortFn,
|
|||||||
|
|
||||||
// Index the global side so we can do one pass through the local side
|
// Index the global side so we can do one pass through the local side
|
||||||
// with cheap lookups.
|
// with cheap lookups.
|
||||||
index := make(map[string]*agent.Server)
|
index := make(map[string]*metadata.Server)
|
||||||
for _, m := range globalSerf.Members() {
|
for _, m := range globalSerf.Members() {
|
||||||
ok, server := agent.IsConsulServer(m)
|
ok, server := metadata.IsConsulServer(m)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -48,7 +48,7 @@ func FloodJoins(logger *log.Logger, portFn FloodPortFn,
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
ok, server := agent.IsConsulServer(m)
|
ok, server := metadata.IsConsulServer(m)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user