Complete logic for sending coordinates

This commit is contained in:
Derek Chiang 2015-04-15 19:12:45 -04:00 committed by James Phillips
parent 8a0bb40bba
commit 98d87b5dd5
5 changed files with 54 additions and 12 deletions

View File

@ -38,6 +38,9 @@ const (
"but no reason was provided. This is a default message." "but no reason was provided. This is a default message."
defaultServiceMaintReason = "Maintenance mode is enabled for this " + defaultServiceMaintReason = "Maintenance mode is enabled for this " +
"service, but no reason was provided. This is a default message." "service, but no reason was provided. This is a default message."
// An interval used to send network coordinates to servers
syncCoordinateStaggerIntv = 15 * time.Second
) )
var ( var (
@ -557,20 +560,39 @@ func (a *Agent) ResumeSync() {
a.state.Resume() a.state.Resume()
} }
// SendCoordinates starts a goroutine that periodically sends the local coordinate // SendCoordinates starts a loop that periodically sends the local coordinate
// to a server // to a server
func (a *Agent) SendCoordinates() { func (a *Agent) SendCoordinates(shutdownCh chan struct{}) {
for {
intv := aeScale(a.config.SyncCoordinateInterval, len(a.LANMembers()))
intv = intv + randomStagger(intv)
timer := time.After(intv)
select {
case <-timer:
var c coordinate.Coordinate var c coordinate.Coordinate
if a.config.Server { if a.config.Server {
c = a.server c = a.server.GetLANCoordinate()
} else {
c = a.client.GetCoordinate()
} }
req := structs.CoordinateUpdateRequest{ req := structs.CoordinateUpdateRequest{
NodeSpecificRequest: NodeSpecificRequest{ NodeSpecificRequest: NodeSpecificRequest{
Datacenter: a.config.Datacenter, Datacenter: a.config.Datacenter,
Node: a.config.NodeName, Node: a.config.NodeName,
}, },
Op: structs.CoordinateSet,
Coord: c,
QueryOptions: structs.QueryOptions{Token: a.config.ACLToken}, QueryOptions: structs.QueryOptions{Token: a.config.ACLToken},
} }
var reply struct{}
if err := a.RPC("Coordinate.Update", &arg, &reply); err != nil {
a.logger.Printf("[ERR] coordinate update error: %s", err.Error())
}
case <-shutdownCh:
return
}
}
} }
// persistService saves a service definition to a JSON file in the data dir // persistService saves a service definition to a JSON file in the data dir

View File

@ -709,6 +709,9 @@ func (c *Command) Run(args []string) int {
errWanCh := make(chan struct{}) errWanCh := make(chan struct{})
go c.retryJoinWan(config, errWanCh) go c.retryJoinWan(config, errWanCh)
// Start sending network coordinates to servers
go c.agent.SendCoordinates(c.agent.shutdownCh)
// Wait for exit // Wait for exit
return c.handleSignals(config, errCh, errWanCh) return c.handleSignals(config, errCh, errWanCh)
} }

View File

@ -372,6 +372,10 @@ type Config struct {
// representation of our state. Defaults to every 60s. // representation of our state. Defaults to every 60s.
AEInterval time.Duration `mapstructure:"-" json:"-"` AEInterval time.Duration `mapstructure:"-" json:"-"`
// SyncCoordinateInterval controls the interval for sending network coordinates
// to servers.
SyncCoordinateInterval time.Duration `mapstructure:"-" json:"-"`
// Checks holds the provided check definitions // Checks holds the provided check definitions
Checks []*CheckDefinition `mapstructure:"-" json:"-"` Checks []*CheckDefinition `mapstructure:"-" json:"-"`
@ -466,6 +470,7 @@ func DefaultConfig() *Config {
Protocol: consul.ProtocolVersionMax, Protocol: consul.ProtocolVersionMax,
CheckUpdateInterval: 5 * time.Minute, CheckUpdateInterval: 5 * time.Minute,
AEInterval: time.Minute, AEInterval: time.Minute,
SyncCoordinateInterval: 15 * time.Second,
ACLTTL: 30 * time.Second, ACLTTL: 30 * time.Second,
ACLDownPolicy: "extend-cache", ACLDownPolicy: "extend-cache",
ACLDefaultPolicy: "allow", ACLDefaultPolicy: "allow",

View File

@ -12,6 +12,7 @@ import (
"time" "time"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
) )
@ -376,3 +377,8 @@ func (c *Client) Stats() map[string]map[string]string {
} }
return stats return stats
} }
// GetCoordinate returns the network coordinate of the receiver
func (c *Client) GetCoordinate() *coordinate.Coordinate {
return c.serf.GetCoordinate()
}

View File

@ -19,6 +19,7 @@ import (
"github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb" "github.com/hashicorp/raft-boltdb"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
) )
@ -693,3 +694,8 @@ func (s *Server) Stats() map[string]map[string]string {
} }
return stats return stats
} }
// GetLANCoordinate returns the network coordinate of the receiver
func (s *Server) GetLANCoordinate() *coordinate.Coordinate {
return s.serfLAN.GetCoordinate()
}