Adds router into RPC paths with work in progress on coordinates.

This commit is contained in:
James Phillips 2017-03-13 18:54:34 -07:00
parent 78b62ca406
commit 472f1bd63e
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
12 changed files with 415 additions and 153 deletions

View File

@ -10,10 +10,11 @@ type CoordinateEntry struct {
Coord *coordinate.Coordinate
}
// CoordinateDatacenterMap represents a datacenter and its associated WAN
// nodes and their associates coordinates.
// CoordinateDatacenterMap has the coordinates for servers in a given datacenter
// and area. Network coordinates are only compatible within the same area.
type CoordinateDatacenterMap struct {
Datacenter string
AreaID string
Coordinates []CoordinateEntry
}

View File

@ -2,8 +2,6 @@ package consul
import (
"fmt"
"sort"
"strings"
"sync"
"time"
@ -143,25 +141,9 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
// and the raw coordinates of those nodes (if no coordinates are available for
// any of the nodes, the node list may be empty).
func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.DatacenterMap) error {
c.srv.remoteLock.RLock()
defer c.srv.remoteLock.RUnlock()
// Build up a map of all the DCs, sort it first since getDatacenterMaps
// will preserve the order of this list in the output.
dcs := make([]string, 0, len(c.srv.remoteConsuls))
for dc := range c.srv.remoteConsuls {
dcs = append(dcs, dc)
}
sort.Strings(dcs)
maps := c.srv.getDatacenterMaps(dcs)
// Strip the datacenter suffixes from all the node names.
for i := range maps {
suffix := fmt.Sprintf(".%s", maps[i].Datacenter)
for j := range maps[i].Coordinates {
node := maps[i].Coordinates[j].Node
maps[i].Coordinates[j].Node = strings.TrimSuffix(node, suffix)
}
maps, err := c.srv.router.GetDatacenterMaps()
if err != nil {
return err
}
*reply = maps

View File

@ -4,7 +4,6 @@ import (
"crypto/tls"
"fmt"
"io"
"math/rand"
"net"
"strings"
"time"
@ -266,31 +265,22 @@ func (s *Server) forwardLeader(server *agent.Server, method string, args interfa
return s.connPool.RPC(s.config.Datacenter, server.Addr.String(), server.Version, method, args, reply)
}
// getRemoteServer returns a random server from a remote datacenter. This uses
// the bool parameter to signal that none were available.
func (s *Server) getRemoteServer(dc string) (*agent.Server, bool) {
s.remoteLock.RLock()
defer s.remoteLock.RUnlock()
servers := s.remoteConsuls[dc]
if len(servers) == 0 {
return nil, false
}
offset := rand.Int31n(int32(len(servers)))
server := servers[offset]
return server, true
}
// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers
func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{}) error {
server, ok := s.getRemoteServer(dc)
manager, server, ok := s.router.FindRoute(dc)
if !ok {
s.logger.Printf("[WARN] consul.rpc: RPC request for DC '%s', no path found", dc)
s.logger.Printf("[WARN] consul.rpc: RPC request for DC %q, no path found", dc)
return structs.ErrNoDCPath
}
metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1)
return s.connPool.RPC(dc, server.Addr.String(), server.Version, method, args, reply)
if err := s.connPool.RPC(dc, server.Addr.String(), server.Version, method, args, reply); err != nil {
manager.NotifyFailedServer(server)
s.logger.Printf("[ERR] consul: RPC failed to server %s in DC %q: %v", server.Addr, dc, err)
return err
}
return nil
}
// globalRPC is used to forward an RPC request to one server in each datacenter.
@ -303,12 +293,7 @@ func (s *Server) globalRPC(method string, args interface{},
respCh := make(chan interface{})
// Make a new request into each datacenter
s.remoteLock.RLock()
dcs := make([]string, 0, len(s.remoteConsuls))
for dc, _ := range s.remoteConsuls {
dcs = append(dcs, dc)
}
s.remoteLock.RUnlock()
dcs := s.router.GetDatacenters()
for _, dc := range dcs {
go func(dc string) {
rr := reply.New()
@ -320,7 +305,7 @@ func (s *Server) globalRPC(method string, args interface{},
}(dc)
}
replies, total := 0, len(s.remoteConsuls)
replies, total := 0, len(dcs)
for replies < total {
select {
case err := <-errorCh:

View File

@ -70,30 +70,6 @@ func (s *Server) lanEventHandler() {
}
}
// wanEventHandler is used to handle events from the wan Serf cluster
func (s *Server) wanEventHandler() {
for {
select {
case e := <-s.eventChWAN:
switch e.EventType() {
case serf.EventMemberJoin:
s.wanNodeJoin(e.(serf.MemberEvent))
case serf.EventMemberLeave, serf.EventMemberFailed:
s.wanNodeFailed(e.(serf.MemberEvent))
case serf.EventMemberUpdate: // Ignore
case serf.EventMemberReap: // Ignore
case serf.EventUser:
case serf.EventQuery: // Ignore
default:
s.logger.Printf("[WARN] consul: Unhandled WAN Serf Event: %#v", e)
}
case <-s.shutdownCh:
return
}
}
}
// localMemberEvent is used to reconcile Serf events with the strongly
// consistent store if we are the current leader
func (s *Server) localMemberEvent(me serf.MemberEvent) {
@ -169,36 +145,6 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) {
}
}
// wanNodeJoin is used to handle join events on the WAN pool.
func (s *Server) wanNodeJoin(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := agent.IsConsulServer(m)
if !ok {
s.logger.Printf("[WARN] consul: Non-server in WAN pool: %s", m.Name)
continue
}
s.logger.Printf("[INFO] consul: Adding WAN server %s", parts)
// Search for this node in our existing remotes.
found := false
s.remoteLock.Lock()
existing := s.remoteConsuls[parts.Datacenter]
for idx, e := range existing {
if e.Name == parts.Name {
existing[idx] = parts
found = true
break
}
}
// Add to the list if not known.
if !found {
s.remoteConsuls[parts.Datacenter] = append(existing, parts)
}
s.remoteLock.Unlock()
}
}
// maybeBootstrap is used to handle bootstrapping when a new consul server joins.
func (s *Server) maybeBootstrap() {
// Bootstrap can only be done if there are no committed logs, remove our
@ -327,35 +273,3 @@ func (s *Server) lanNodeFailed(me serf.MemberEvent) {
s.localLock.Unlock()
}
}
// wanNodeFailed is used to handle fail events on the WAN pool.
func (s *Server) wanNodeFailed(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := agent.IsConsulServer(m)
if !ok {
continue
}
s.logger.Printf("[INFO] consul: Removing WAN server %s", parts)
// Remove the server if known
s.remoteLock.Lock()
existing := s.remoteConsuls[parts.Datacenter]
n := len(existing)
for i := 0; i < n; i++ {
if existing[i].Name == parts.Name {
existing[i], existing[n-1] = existing[n-1], nil
existing = existing[:n-1]
n--
break
}
}
// Trim the list if all known consuls are dead
if n == 0 {
delete(s.remoteConsuls, parts.Datacenter)
} else {
s.remoteConsuls[parts.Datacenter] = existing
}
s.remoteLock.Unlock()
}
}

View File

@ -18,10 +18,12 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/consul/servers"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb"
"github.com/hashicorp/serf/coordinate"
@ -140,6 +142,10 @@ type Server struct {
remoteConsuls map[string][]*agent.Server
remoteLock sync.RWMutex
// router is used to map out Consul servers in the WAN and in Consul
// Enterprise user-defined areas.
router *servers.Router
// rpcListener is used to listen for incoming connections
rpcListener net.Listener
rpcServer *rpc.Server
@ -236,6 +242,9 @@ func NewServer(config *Config) (*Server, error) {
return nil, err
}
// Create the shutdown channel - this is closed but never written to.
shutdownCh := make(chan struct{})
// Create server.
s := &Server{
autopilotRemoveDeadCh: make(chan struct{}),
@ -248,6 +257,7 @@ func NewServer(config *Config) (*Server, error) {
logger: logger,
reconcileCh: make(chan serf.Member, 32),
remoteConsuls: make(map[string][]*agent.Server, 4),
router: servers.NewRouter(loogger, shutdownCh),
rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS,
tombstoneGC: gc,
@ -290,7 +300,7 @@ func NewServer(config *Config) (*Server, error) {
s.eventChLAN, serfLANSnapshot, false)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start lan serf: %v", err)
return nil, fmt.Errorf("Failed to start LAN Serf: %v", err)
}
go s.lanEventHandler()
@ -299,9 +309,15 @@ func NewServer(config *Config) (*Server, error) {
s.eventChWAN, serfWANSnapshot, true)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start wan serf: %v", err)
return nil, fmt.Errorf("Failed to start WAN Serf: %v", err)
}
go s.wanEventHandler()
// Add a "static route" to the WAN Serf and hook it up to Serf events.
if err := s.router.AddArea(types.AreaWAN, s.serfWAN, s.connPool); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to add WAN serf route: %v", err)
}
go servers.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN)
// Start monitoring leadership. This must happen after Serf is set up
// since it can fire events when leadership is obtained.
@ -602,6 +618,9 @@ func (s *Server) Shutdown() error {
if s.serfWAN != nil {
s.serfWAN.Shutdown()
if err := s.router.RemoveArea(types.AreaWAN); err != nil {
s.logger.Printf("[WARN] consul: error removing WAN area: %v", err)
}
}
if s.raft != nil {
@ -881,9 +900,7 @@ func (s *Server) Stats() map[string]map[string]string {
toString := func(v uint64) string {
return strconv.FormatUint(v, 10)
}
s.remoteLock.RLock()
numKnownDCs := len(s.remoteConsuls)
s.remoteLock.RUnlock()
numKnownDCs := len(s.router.GetDatacenters())
stats := map[string]map[string]string{
"consul": map[string]string{
"server": "true",

View File

@ -50,9 +50,9 @@ const (
newRebalanceConnsPerSecPerServer = 64
)
// ConsulClusterInfo is an interface wrapper around serf in order to prevent
// a cyclic import dependency.
type ConsulClusterInfo interface {
// ManagerSerfCluster is an interface wrapper around Serf in order to make this
// easier to unit test.
type ManagerSerfCluster interface {
NumNodes() int
}
@ -88,8 +88,8 @@ type Manager struct {
// clusterInfo is used to estimate the approximate number of nodes in
// a cluster and limit the rate at which it rebalances server
// connections. ConsulClusterInfo is an interface that wraps serf.
clusterInfo ConsulClusterInfo
// connections. ManagerSerfCluster is an interface that wraps serf.
clusterInfo ManagerSerfCluster
// connPoolPinger is used to test the health of a server in the
// connection pool. Pinger is an interface that wraps
@ -214,7 +214,7 @@ func (m *Manager) saveServerList(l serverList) {
}
// New is the only way to safely create a new Manager struct.
func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo, connPoolPinger Pinger) (m *Manager) {
func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfCluster, connPoolPinger Pinger) (m *Manager) {
m = new(Manager)
m.logger = logger
m.clusterInfo = clusterInfo // can't pass *consul.Client: import cycle

267
consul/servers/router.go Normal file
View File

@ -0,0 +1,267 @@
package servers
import (
"fmt"
"log"
"sync"
"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
)
type Router struct {
logger *log.Logger
areas map[types.AreaID]*areaInfo
managers map[string][]*Manager
// This top-level lock covers all the internal state.
sync.RWMutex
}
// RouterSerfCluster is an interface wrapper around Serf in order to make this
// easier to unit test.
type RouterSerfCluster interface {
NumNodes() int
Members() []serf.Member
GetCoordinate() (*coordinate.Coordinate, error)
GetCachedCoordinate(name string) (coord *coordinate.Coordinate, ok bool)
}
type managerInfo struct {
manager *Manager
shutdownCh chan struct{}
}
type areaInfo struct {
cluster RouterSerfCluster
pinger Pinger
managers map[string]*managerInfo
}
func NewRouter(logger *log.Logger, shutdownCh chan struct{}) *Router {
router := &Router{
logger: logger,
areas: make(map[types.AreaID]*areaInfo),
managers: make(map[string][]*Manager),
}
// This will propagate a top-level shutdown to all the managers.
go func() {
<-shutdownCh
router.Lock()
defer router.Unlock()
for _, area := range router.areas {
for _, info := range area.managers {
close(info.shutdownCh)
}
}
router.areas = nil
router.managers = nil
}()
return router
}
func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger Pinger) error {
r.Lock()
defer r.Unlock()
if _, ok := r.areas[areaID]; ok {
return fmt.Errorf("area ID %q already exists", areaID)
}
r.areas[areaID] = &areaInfo{
cluster: cluster,
pinger: pinger,
managers: make(map[string]*managerInfo),
}
return nil
}
// removeManagerFromIndex does cleanup to take a manager out of the index of
// datacenters. This assumes the lock is already held for writing, and will
// panic if the given manager isn't found.
func (r *Router) removeManagerFromIndex(datacenter string, manager *Manager) {
managers := r.managers[datacenter]
for i := 0; i < len(managers); i++ {
if managers[i] == manager {
r.managers[datacenter] = append(managers[:i], managers[i+1:]...)
return
}
}
panic("managers index out of sync")
}
func (r *Router) RemoveArea(areaID types.AreaID) error {
r.Lock()
defer r.Unlock()
area, ok := r.areas[areaID]
if !ok {
return fmt.Errorf("area ID %q does not exist", areaID)
}
// Remove all of this area's managers from the index and shut them down.
for datacenter, info := range area.managers {
r.removeManagerFromIndex(datacenter, info.manager)
close(info.shutdownCh)
}
delete(r.areas, areaID)
return nil
}
func (r *Router) AddServer(areaID types.AreaID, s *agent.Server) error {
r.Lock()
defer r.Unlock()
area, ok := r.areas[areaID]
if !ok {
return fmt.Errorf("area ID %q does not exist", areaID)
}
// Make the manager on the fly if this is the first we've seen of it,
// and add it to the index.
info, ok := area.managers[s.Datacenter]
if !ok {
shutdownCh := make(chan struct{})
manager := New(r.logger, shutdownCh, area.cluster, area.pinger)
info = &managerInfo{
manager: manager,
shutdownCh: shutdownCh,
}
managers := r.managers[s.Datacenter]
r.managers[s.Datacenter] = append(managers, manager)
}
info.manager.AddServer(s)
return nil
}
func (r *Router) RemoveServer(areaID types.AreaID, s *agent.Server) error {
r.Lock()
defer r.Unlock()
area, ok := r.areas[areaID]
if !ok {
return fmt.Errorf("area ID %q does not exist", areaID)
}
// If the manager has already been removed we just quietly exit. This
// can get called by Serf events, so the timing isn't totally
// deterministic.
info, ok := area.managers[s.Datacenter]
if !ok {
return nil
}
info.manager.RemoveServer(s)
// If this manager is empty then remove it so we don't accumulate cruft
// and waste time during request routing.
if num := info.manager.NumServers(); num == 0 {
r.removeManagerFromIndex(s.Datacenter, info.manager)
close(info.shutdownCh)
delete(area.managers, s.Datacenter)
}
return nil
}
func (r *Router) FailServer(areaID types.AreaID, s *agent.Server) error {
r.RLock()
defer r.RUnlock()
area, ok := r.areas[areaID]
if !ok {
return fmt.Errorf("area ID %q does not exist", areaID)
}
// If the manager has already been removed we just quietly exit. This
// can get called by Serf events, so the timing isn't totally
// deterministic.
info, ok := area.managers[s.Datacenter]
if !ok {
return nil
}
info.manager.NotifyFailedServer(s)
return nil
}
func (r *Router) GetDatacenters() []string {
r.RLock()
defer r.RUnlock()
dcs := make([]string, 0, len(r.managers))
for dc, _ := range r.managers {
dcs = append(dcs, dc)
}
return dcs
}
func (r *Router) GetDatacenterMaps() ([]structs.DatacenterMap, error) {
r.RLock()
defer r.RUnlock()
var maps []structs.DatacenterMap
for areaID, info := range r.areas {
index := make(map[string]structs.Coordinates)
for _, m := range info.cluster.Members() {
ok, parts := agent.IsConsulServer(m)
if !ok {
r.logger.Printf("[WARN]: consul: Non-server %q in server-only area %q",
m.Name, areaID)
continue
}
coord, ok := info.cluster.GetCachedCoordinate(parts.Name)
if ok {
entry := &structs.Coordinate{
Node: parts.Name,
Coord: coord,
}
existing := index[parts.Datacenter]
index[parts.Datacenter] = append(existing, entry)
}
}
for dc, coords := range index {
entry := structs.DatacenterMap{
Datacenter: dc,
AreaID: areaID,
Coordinates: coords,
}
maps = append(maps, entry)
}
}
return maps, nil
}
func (r *Router) FindRoute(datacenter string) (*Manager, *agent.Server, bool) {
r.RLock()
defer r.RUnlock()
// Get the list of managers for this datacenter. This will usually just
// have one entry, but it's possible to have a user-defined area + WAN.
managers, ok := r.managers[datacenter]
if !ok {
return nil, nil, false
}
// Try each manager until we get a server.
for _, manager := range managers {
if s := manager.FindServer(); s != nil {
return manager, s, true
}
}
// Didn't find a route (even via an unhealthy server).
return nil, nil, false
}

View File

@ -0,0 +1,73 @@
package servers
import (
"log"
"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/serf/serf"
)
// routerFn selects one of the router operations to map to incoming Serf events.
type routerFn func(types.AreaID, *agent.Server) error
// handleMemberEvents attempts to apply the given Serf member event to the given
// router function.
func handleMemberEvent(logger *log.Logger, fn routerFn, areaID types.AreaID, e serf.Event) {
me, ok := e.(serf.MemberEvent)
if !ok {
logger.Printf("[ERR] consul: Bad event type %#v", e)
return
}
for _, m := range me.Members {
ok, parts := agent.IsConsulServer(m)
if !ok {
logger.Printf("[WARN]: consul: Non-server %q in server-only area %q",
m.Name, areaID)
continue
}
if err := fn(areaID, parts); err != nil {
logger.Printf("[ERR] consul: Failed to process %s event for server %q in area %q: %v",
me.Type.String(), m.Name, areaID, err)
continue
}
logger.Printf("[INFO] consul: Handled %s event for server %q in area %q",
me.Type.String(), m.Name, areaID)
}
}
// HandleSerfEvents is a long-running goroutine that pushes incoming events from
// a Serf manager's channel into the given router. This will return when the
// shutdown channel is closed.
func HandleSerfEvents(logger *log.Logger, router *Router, areaID types.AreaID, shutdownCh <-chan struct{}, eventCh <-chan serf.Event) {
for {
select {
case <-shutdownCh:
return
case e := <-eventCh:
switch e.EventType() {
case serf.EventMemberJoin:
handleMemberEvent(logger, router.AddServer, areaID, e)
case serf.EventMemberLeave:
handleMemberEvent(logger, router.RemoveServer, areaID, e)
case serf.EventMemberFailed:
handleMemberEvent(logger, router.FailServer, areaID, e)
// All of these event types are ignored.
case serf.EventMemberUpdate:
case serf.EventMemberReap:
case serf.EventUser:
case serf.EventQuery:
default:
logger.Printf("[WARN] consul: Unhandled Serf Event: %#v", e)
}
}
}
}

View File

@ -30,11 +30,18 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re
// Perform DC forwarding.
if dc := args.Datacenter; dc != s.config.Datacenter {
server, ok := s.getRemoteServer(dc)
manager, server, ok := s.router.FindRoute(dc)
if !ok {
return nil, structs.ErrNoDCPath
}
return SnapshotRPC(s.connPool, dc, server.Addr.String(), args, in, reply)
snap, err := SnapshotRPC(s.connPool, dc, server.Addr.String(), args, in, reply)
if err != nil {
manager.NotifyFailedServer(server)
return nil, err
}
return snap, nil
}
// Perform leader forwarding if required.

View File

@ -901,9 +901,11 @@ type IndexedCoordinates struct {
}
// DatacenterMap is used to represent a list of nodes with their raw coordinates,
// associated with a datacenter.
// associated with a datacenter. Coordinates are only compatible between nodes in
// the same area.
type DatacenterMap struct {
Datacenter string
AreaID types.AreaID
Coordinates Coordinates
}

9
types/area.go Normal file
View File

@ -0,0 +1,9 @@
package types
// AreaID is a strongly-typed string used to uniquely represent a network area,
// which is a relationship between Consul servers.
type AreaID string
// This represents the existing WAN area that's built in to Consul. Consul
// Enterprise generalizes areas, which are represented with UUIDs.
const AreaWAN AreaID = "WAN"

View File

@ -34,6 +34,7 @@ It returns a JSON body like this:
[
{
"Datacenter": "dc1",
"AreaID": "WAN",
"Coordinates": [
{
"Node": "agent-one",
@ -49,9 +50,13 @@ It returns a JSON body like this:
]
```
This endpoint serves data out of the server's local Serf data about the WAN, so
its results may vary as requests are handled by different servers in the
cluster. Also, it does not support blocking queries or any consistency modes.
This endpoint serves data out of the server's local Serf data, so its results may
vary as requests are handled by different servers in the cluster. In Consul
Enterprise, this will include coordinates for user-added network areas as well,
as indicated by the `AreaID`. Coordinates are only compatible within the same
area.
This endpoint does not support blocking queries or any consistency modes.
### <a name=""coordinate_nodes></a> /v1/coordinate/nodes