Does a clean up pass on the Consul side.

This commit is contained in:
James Phillips 2015-06-05 20:31:33 -07:00
parent 01d2452ea3
commit 86b112fe31
12 changed files with 223 additions and 261 deletions

View File

@ -192,15 +192,17 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
// Start handling events // Start handling events
go agent.handleEvents() go agent.handleEvents()
// Start sending network coordinate to the server.
if config.EnableCoordinates {
go agent.sendCoordinate()
}
// Write out the PID file if necessary // Write out the PID file if necessary
err = agent.storePid() err = agent.storePid()
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Start sending network coordinates to servers
go agent.sendCoordinates()
return agent, nil return agent, nil
} }
@ -560,12 +562,13 @@ func (a *Agent) ResumeSync() {
a.state.Resume() a.state.Resume()
} }
// sendCoordinates starts a loop that periodically sends the local coordinate // sendCoordinate is a long-running loop that periodically sends our coordinate
// to a server // to the server. Closing the agent's shutdownChannel will cause this to exit.
func (a *Agent) sendCoordinates() { func (a *Agent) sendCoordinate() {
for { for {
intv := aeScale(a.config.SyncCoordinateInterval, len(a.LANMembers())) intv := aeScale(a.config.SyncCoordinateInterval, len(a.LANMembers()))
intv = intv + randomStagger(intv) intv = intv + randomStagger(intv)
select { select {
case <-time.After(intv): case <-time.After(intv):
var c *coordinate.Coordinate var c *coordinate.Coordinate
@ -577,14 +580,14 @@ func (a *Agent) sendCoordinates() {
req := structs.CoordinateUpdateRequest{ req := structs.CoordinateUpdateRequest{
Datacenter: a.config.Datacenter, Datacenter: a.config.Datacenter,
Node: a.config.NodeName, Node: a.config.NodeName,
Op: structs.CoordinateSet, Op: structs.CoordinateUpdate,
Coord: c, Coord: c,
WriteRequest: structs.WriteRequest{Token: a.config.ACLToken}, WriteRequest: structs.WriteRequest{Token: a.config.ACLToken},
} }
var reply struct{} var reply struct{}
if err := a.RPC("Coordinate.Update", &req, &reply); err != nil { if err := a.RPC("Coordinate.Update", &req, &reply); err != nil {
a.logger.Printf("[ERR] agent: coordinate update error: %s", err.Error()) a.logger.Printf("[ERR] agent: coordinate update error: %s", err)
} }
case <-a.shutdownCh: case <-a.shutdownCh:
return return

View File

@ -368,13 +368,17 @@ type Config struct {
AtlasEndpoint string `mapstructure:"atlas_endpoint"` AtlasEndpoint string `mapstructure:"atlas_endpoint"`
// AEInterval controls the anti-entropy interval. This is how often // AEInterval controls the anti-entropy interval. This is how often
// the agent attempts to reconcile it's local state with the server' // the agent attempts to reconcile its local state with the server's
// representation of our state. Defaults to every 60s. // representation of our state. Defaults to every 60s.
AEInterval time.Duration `mapstructure:"-" json:"-"` AEInterval time.Duration `mapstructure:"-" json:"-"`
// SyncCoordinateInterval controls the interval for sending network coordinates // EnableCoordinates enables features related to network coordinates.
// to servers. Defaults to every 15s, but scales up as the number of nodes increases EnableCoordinates bool `mapstructure:"enable_coordinates" json:"-"`
// in the network, to prevent servers from being overwhelmed.
// SyncCoordinateInterval controls the interval for sending network
// coordinates to the server. Defaults to every 15s, but scales up as
// the number of nodes increases in the network, to prevent servers from
// being overwhelmed.
SyncCoordinateInterval time.Duration `mapstructure:"-" json:"-"` SyncCoordinateInterval time.Duration `mapstructure:"-" json:"-"`
// Checks holds the provided check definitions // Checks holds the provided check definitions
@ -471,6 +475,7 @@ func DefaultConfig() *Config {
Protocol: consul.ProtocolVersionMax, Protocol: consul.ProtocolVersionMax,
CheckUpdateInterval: 5 * time.Minute, CheckUpdateInterval: 5 * time.Minute,
AEInterval: time.Minute, AEInterval: time.Minute,
EnableCoordinates: true,
SyncCoordinateInterval: 15 * time.Second, SyncCoordinateInterval: 15 * time.Second,
ACLTTL: 30 * time.Second, ACLTTL: 30 * time.Second,
ACLDownPolicy: "extend-cache", ACLDownPolicy: "extend-cache",

View File

@ -387,6 +387,12 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
} }
} }
var testRegisterRules = `
service "api" {
policy = "write"
}
`
func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
conf := nextConfig() conf := nextConfig()
conf.ACLDatacenter = "dc1" conf.ACLDatacenter = "dc1"
@ -796,55 +802,31 @@ func TestAgent_nestedPauseResume(t *testing.T) {
} }
var testRegisterRules = `
service "api" {
policy = "write"
}
`
func TestAgentSendCoordinates(t *testing.T) {
conf1 := nextConfig()
conf1.SyncCoordinateInterval = 10 * time.Millisecond
dir1, agent1 := makeAgent(t, conf1)
defer os.RemoveAll(dir1)
defer agent1.Shutdown()
conf2 := nextConfig() func TestAgent_sendCoordinate(t *testing.T) {
conf2.SyncCoordinateInterval = 10 * time.Millisecond conf := nextConfig()
dir2, agent2 := makeAgent(t, conf2) conf.SyncCoordinateInterval = 10 * time.Millisecond
defer os.RemoveAll(dir2) conf.ConsulConfig.CoordinateUpdatePeriod = 0 * time.Millisecond
defer agent2.Shutdown() dir, agent := makeAgent(t, conf)
defer os.RemoveAll(dir)
defer agent.Shutdown()
agent2Addr := fmt.Sprintf("127.0.0.1:%d", agent2.config.Ports.SerfLan) testutil.WaitForLeader(t, agent.RPC, "dc1")
if _, err := agent2.JoinLAN([]string{agent2Addr}); err != nil {
t.Fatalf("err: %s", err) // Wait a little while for an update.
time.Sleep(3 * conf.SyncCoordinateInterval)
// Make sure the coordinate is present.
req := structs.NodeSpecificRequest {
Datacenter: agent.config.Datacenter,
Node: agent.config.NodeName,
} }
testutil.WaitForLeader(t, agent1.RPC, "dc1")
time.Sleep(100 * time.Millisecond)
var reply structs.IndexedCoordinate var reply structs.IndexedCoordinate
req := structs.CoordinateGetRequest{ if err := agent.RPC("Coordinate.Get", &req, &reply); err != nil {
Datacenter: agent1.config.Datacenter,
Node: agent1.config.NodeName,
}
if err := agent1.RPC("Coordinate.Get", &req, &reply); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if reply.Coord == nil { if reply.Coord == nil {
t.Fatalf("should get a coordinate") t.Fatalf("should get a coordinate")
} }
var reply2 structs.IndexedCoordinate
req2 := structs.CoordinateGetRequest{
Datacenter: agent2.config.Datacenter,
Node: agent2.config.NodeName,
}
if err := agent1.RPC("Coordinate.Get", &req2, &reply2); err != nil {
t.Fatalf("err: %v", err)
}
if reply2.Coord == nil {
t.Fatalf("should get a coordinate")
}
} }

View File

@ -380,7 +380,8 @@ func (c *Client) Stats() map[string]map[string]string {
return stats return stats
} }
// GetCoordinate returns the network coordinate of the receiver // GetCoordinate returns the network coordinate of the current node, as
// maintained by Serf.
func (c *Client) GetCoordinate() *coordinate.Coordinate { func (c *Client) GetCoordinate() *coordinate.Coordinate {
return c.serf.GetCoordinate() return c.serf.GetCoordinate()
} }

View File

@ -206,13 +206,14 @@ type Config struct {
// EnableCoordinates enables features related to network coordinates. // EnableCoordinates enables features related to network coordinates.
EnableCoordinates bool EnableCoordinates bool
// CoordinateUpdatePeriod controls how long a server batches coordinate updates // CoordinateUpdatePeriod controls how long a server batches coordinate
// before applying them in a Raft transaction. A larger period leads to fewer // updates before applying them in a Raft transaction. A larger period
// Raft transactions, but also the stored coordinates being more stale. // leads to fewer Raft transactions, but also the stored coordinates
// being more stale.
CoordinateUpdatePeriod time.Duration CoordinateUpdatePeriod time.Duration
// CoordinateUpdateMaxBatchSize controls the maximum number of updates a // CoordinateUpdateMaxBatchSize controls the maximum number of updates a
// server batches before applying them in a Raft transaction // server batches before applying them in a Raft transaction.
CoordinateUpdateMaxBatchSize int CoordinateUpdateMaxBatchSize int
} }
@ -269,7 +270,7 @@ func DefaultConfig() *Config {
TombstoneTTLGranularity: 30 * time.Second, TombstoneTTLGranularity: 30 * time.Second,
SessionTTLMin: 10 * time.Second, SessionTTLMin: 10 * time.Second,
EnableCoordinates: true, EnableCoordinates: true,
CoordinateUpdatePeriod: time.Duration(30) * time.Second, CoordinateUpdatePeriod: 30 * time.Second,
CoordinateUpdateMaxBatchSize: 1000, CoordinateUpdateMaxBatchSize: 1000,
} }

View File

@ -7,15 +7,68 @@ import (
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
) )
// Coordinate manages queries and updates for network coordinates.
type Coordinate struct { type Coordinate struct {
srv *Server // srv is a pointer back to the server.
updateLastSent time.Time srv *Server
updateBuffer []*structs.CoordinateUpdateRequest
// updateLastSent is the last time we flushed pending coordinate updates
// to the Raft log. CoordinateUpdatePeriod is used to control how long we
// wait before doing an update (that time, or hitting more than the
// configured CoordinateUpdateMaxBatchSize, whichever comes first).
updateLastSent time.Time
// updateBuffer holds the pending coordinate updates, waiting to be
// flushed to the Raft log.
updateBuffer []*structs.CoordinateUpdateRequest
// updateBufferLock manages concurrent access to updateBuffer.
updateBufferLock sync.Mutex updateBufferLock sync.Mutex
} }
// GetLAN returns the the LAN coordinate of a node. // NewCoordinate returns a new Coordinate endpoint.
func (c *Coordinate) GetLAN(args *structs.NodeSpecificRequest, reply *structs.IndexedCoordinate) error { func NewCoordinate(srv *Server) *Coordinate {
return &Coordinate{
srv: srv,
updateLastSent: time.Now(),
}
}
// Update handles requests to update the LAN coordinate of a node.
func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) error {
if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done {
return err
}
c.updateBufferLock.Lock()
defer c.updateBufferLock.Unlock()
c.updateBuffer = append(c.updateBuffer, args)
// Process updates in batches to avoid tons of small transactions against
// the Raft log.
shouldFlush := time.Since(c.updateLastSent) > c.srv.config.CoordinateUpdatePeriod ||
len(c.updateBuffer) > c.srv.config.CoordinateUpdateMaxBatchSize
if shouldFlush {
// This transaction could take a while so we don't block here.
buf := c.updateBuffer
go func() {
_, err := c.srv.raftApply(structs.CoordinateRequestType, buf)
if err != nil {
c.srv.logger.Printf("[ERR] consul.coordinate: Update failed: %v", err)
}
}()
// We clear the buffer regardless of whether the raft transaction
// succeeded, just so the buffer doesn't keep growing without bound.
c.updateLastSent = time.Now()
c.updateBuffer = nil
}
return nil
}
// Get returns the coordinate of the given node in the LAN.
func (c *Coordinate) Get(args *structs.NodeSpecificRequest, reply *structs.IndexedCoordinate) error {
if done, err := c.srv.forward("Coordinate.GetLAN", args, args, reply); done { if done, err := c.srv.forward("Coordinate.GetLAN", args, args, reply); done {
return err return err
} }
@ -35,61 +88,3 @@ func (c *Coordinate) GetLAN(args *structs.NodeSpecificRequest, reply *structs.In
return err return err
}) })
} }
// GetWAN returns the WAN coordinates of the servers in a given datacenter.
//
// Note that the server does not necessarily know about *all* servers in the given datacenter.
// It just returns the coordinates of those that it knows.
func (c *Coordinate) GetWAN(args *structs.DCSpecificRequest, reply *structs.CoordinateList) error {
if args.Datacenter == c.srv.config.Datacenter {
reply.Coords = make([]structs.Coordinate, 1)
reply.Coords[0] = structs.Coordinate{
Node: c.srv.config.NodeName,
Coord: c.srv.GetWANCoordinate(),
}
} else {
servers := c.srv.remoteConsuls[args.Datacenter] // servers in the specified DC
reply.Coords = make([]structs.Coordinate, 0)
for i := 0; i < len(servers); i++ {
if coord := c.srv.serfWAN.GetCachedCoordinate(servers[i].Name); coord != nil {
reply.Coords = append(reply.Coords, structs.Coordinate{
Node: servers[i].Name,
Coord: coord,
})
}
}
}
return nil
}
func flushCoordinates(c *Coordinate, buf []*structs.CoordinateUpdateRequest) {
_, err := c.srv.raftApply(structs.CoordinateRequestType, buf)
if err != nil {
c.srv.logger.Printf("[ERR] consul.coordinate: Update failed: %v", err)
}
}
// Update updates the the LAN coordinate of a node.
func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) error {
if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done {
return err
}
c.updateBufferLock.Lock()
defer c.updateBufferLock.Unlock()
c.updateBuffer = append(c.updateBuffer, args)
if time.Since(c.updateLastSent) > c.srv.config.CoordinateUpdatePeriod || len(c.updateBuffer) > c.srv.config.CoordinateUpdateMaxBatchSize {
c.srv.logger.Printf("sending update for %v", args.Node)
// Apply the potentially time-consuming transaction out of band
go flushCoordinates(c, c.updateBuffer)
// We clear the buffer regardless of whether the raft transaction succeeded, just so the
// buffer doesn't keep growing without bound.
c.updateLastSent = time.Now()
c.updateBuffer = nil
}
return nil
}

View File

@ -13,67 +13,68 @@ import (
"github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/coordinate"
) )
// getRandomCoordinate generates a random coordinate. // generateRandomCoordinate creates a random coordinate. This mucks with the
func getRandomCoordinate() *coordinate.Coordinate { // underlying structure directly, so it's not really useful for any particular
// position in the network, but it's a good payload to send through to make
// sure things come out the other side or get stored correctly.
func generateRandomCoordinate() *coordinate.Coordinate {
config := coordinate.DefaultConfig() config := coordinate.DefaultConfig()
// Randomly apply updates between n clients coord := coordinate.NewCoordinate(config)
n := 5 for i := range coord.Vec {
clients := make([]*coordinate.Client, n) coord.Vec[i] = rand.NormFloat64()
for i := 0; i < n; i++ {
clients[i], _ = coordinate.NewClient(config)
} }
coord.Error = rand.NormFloat64()
for i := 0; i < n*100; i++ { coord.Adjustment = rand.NormFloat64()
k1 := rand.Intn(n) return coord
k2 := rand.Intn(n)
if k1 == k2 {
continue
}
clients[k1].Update(clients[k2].GetCoordinate(), time.Duration(rand.Int63())*time.Microsecond)
}
return clients[rand.Intn(n)].GetCoordinate()
} }
func coordinatesEqual(a, b *coordinate.Coordinate) bool { // verifyCoordinatesEqual will compare a and b and fail if they are not exactly
return reflect.DeepEqual(a, b) // equal (no floating point fuzz is considered).
func verifyCoordinatesEqual(t *testing.T, a, b *coordinate.Coordinate) {
if !reflect.DeepEqual(a, b) {
t.Fatalf("coordinates are not equal: %v != %v", a, b)
}
} }
func TestCoordinate_Update(t *testing.T) { func TestCoordinate_Update(t *testing.T) {
name := fmt.Sprintf("Node %d", getPort()) name := fmt.Sprintf("Node %d", getPort())
dir1, config1 := testServerConfig(t, name) dir1, config1 := testServerConfig(t, name)
config1.CoordinateUpdatePeriod = 1000 * time.Millisecond defer os.RemoveAll(dir1)
config1.CoordinateUpdatePeriod = 1 * time.Second
config1.CoordinateUpdateMaxBatchSize = 5
s1, err := NewServer(config1) s1, err := NewServer(config1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer os.RemoveAll(dir1)
defer s1.Shutdown() defer s1.Shutdown()
client := rpcClient(t, s1) client := rpcClient(t, s1)
defer client.Close() defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1") testutil.WaitForLeader(t, client.Call, "dc1")
arg1 := structs.CoordinateUpdateRequest{ arg1 := structs.CoordinateUpdateRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "node1", Node: "node1",
Op: structs.CoordinateSet, Op: structs.CoordinateUpdate,
Coord: getRandomCoordinate(), Coord: generateRandomCoordinate(),
} }
arg2 := structs.CoordinateUpdateRequest{ arg2 := structs.CoordinateUpdateRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "node2", Node: "node2",
Op: structs.CoordinateSet, Op: structs.CoordinateUpdate,
Coord: getRandomCoordinate(), Coord: generateRandomCoordinate(),
} }
// Send an update for the first node.
var out struct{} var out struct{}
if err := client.Call("Coordinate.Update", &arg1, &out); err != nil { if err := client.Call("Coordinate.Update", &arg1, &out); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// Verify // Make sure the update did not yet apply because the batching thresholds
// haven't yet been met.
state := s1.fsm.State() state := s1.fsm.State()
_, d, err := state.CoordinateGet("node1") _, d, err := state.CoordinateGet("node1")
if err != nil { if err != nil {
@ -83,12 +84,15 @@ func TestCoordinate_Update(t *testing.T) {
t.Fatalf("should be nil because the update should be batched") t.Fatalf("should be nil because the update should be batched")
} }
// Wait a while and send another update; this time the updates should be sent // Wait a while and send another update. This time both updates should
// be applied.
time.Sleep(2 * s1.config.CoordinateUpdatePeriod) time.Sleep(2 * s1.config.CoordinateUpdatePeriod)
if err := client.Call("Coordinate.Update", &arg2, &out); err != nil { if err := client.Call("Coordinate.Update", &arg2, &out); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// Yield the current goroutine to allow the goroutine that sends the updates to run
// Wait a little while so the flush goroutine can run, then make sure
// both coordinates made it in.
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
_, d, err = state.CoordinateGet("node1") _, d, err = state.CoordinateGet("node1")
@ -98,9 +102,7 @@ func TestCoordinate_Update(t *testing.T) {
if d == nil { if d == nil {
t.Fatalf("should return a coordinate but it's nil") t.Fatalf("should return a coordinate but it's nil")
} }
if !coordinatesEqual(d.Coord, arg1.Coord) { verifyCoordinatesEqual(t, d.Coord, arg1.Coord)
t.Fatalf("should be equal\n%v\n%v", d.Coord, arg1.Coord)
}
_, d, err = state.CoordinateGet("node2") _, d, err = state.CoordinateGet("node2")
if err != nil { if err != nil {
@ -109,12 +111,32 @@ func TestCoordinate_Update(t *testing.T) {
if d == nil { if d == nil {
t.Fatalf("should return a coordinate but it's nil") t.Fatalf("should return a coordinate but it's nil")
} }
if !coordinatesEqual(d.Coord, arg2.Coord) { verifyCoordinatesEqual(t, d.Coord, arg2.Coord)
t.Fatalf("should be equal\n%v\n%v", d.Coord, arg2.Coord)
// Now try spamming coordinates and make sure it flushes when the batch
// size is hit.
for i := 0; i < (s1.config.CoordinateUpdateMaxBatchSize + 1); i++ {
arg1.Coord = generateRandomCoordinate()
if err := client.Call("Coordinate.Update", &arg1, &out); err != nil {
t.Fatalf("err: %v", err)
}
} }
// Wait a little while so the flush goroutine can run, then make sure
// the last coordinate update made it in.
time.Sleep(100 * time.Millisecond)
_, d, err = state.CoordinateGet("node1")
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("should return a coordinate but it's nil")
}
verifyCoordinatesEqual(t, d.Coord, arg1.Coord)
} }
func TestCoordinate_GetLAN(t *testing.T) { func TestCoordinate_Get(t *testing.T) {
dir1, s1 := testServer(t) dir1, s1 := testServer(t)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
defer s1.Shutdown() defer s1.Shutdown()
@ -126,103 +148,39 @@ func TestCoordinate_GetLAN(t *testing.T) {
arg := structs.CoordinateUpdateRequest{ arg := structs.CoordinateUpdateRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "node1", Node: "node1",
Op: structs.CoordinateSet, Op: structs.CoordinateUpdate,
Coord: getRandomCoordinate(), Coord: generateRandomCoordinate(),
} }
// Send an initial update, waiting a little while for the flush goroutine
// to run.
var out struct{} var out struct{}
if err := client.Call("Coordinate.Update", &arg, &out); err != nil { if err := client.Call("Coordinate.Update", &arg, &out); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// Yield the current goroutine to allow the goroutine that sends the updates to run
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
// Get via RPC // Query the coordinate via RPC.
out2 := structs.IndexedCoordinate{}
arg2 := structs.NodeSpecificRequest{ arg2 := structs.NodeSpecificRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "node1", Node: "node1",
} }
if err := client.Call("Coordinate.GetLAN", &arg2, &out2); err != nil { coord := structs.IndexedCoordinate{}
if err := client.Call("Coordinate.Get", &arg2, &coord); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if !coordinatesEqual(out2.Coord, arg.Coord) { verifyCoordinatesEqual(t, coord.Coord, arg.Coord)
t.Fatalf("should be equal\n%v\n%v", out2.Coord, arg.Coord)
}
// Now let's override the original coordinate; Coordinate.Get should return // Send another coordinate update, waiting after for the flush.
// the latest coordinate arg.Coord = generateRandomCoordinate()
arg.Coord = getRandomCoordinate()
if err := client.Call("Coordinate.Update", &arg, &out); err != nil { if err := client.Call("Coordinate.Update", &arg, &out); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// Yield the current goroutine to allow the goroutine that sends the updates to run
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
if err := client.Call("Coordinate.GetLAN", &arg2, &out2); err != nil { // Now re-query and make sure the results are fresh.
if err := client.Call("Coordinate.Get", &arg2, &coord); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if !coordinatesEqual(out2.Coord, arg.Coord) { verifyCoordinatesEqual(t, coord.Coord, arg.Coord)
t.Fatalf("should be equal\n%v\n%v", out2.Coord, arg.Coord)
}
}
func TestCoordinate_GetWAN(t *testing.T) {
// Create 1 server in dc1, 2 servers in dc2
dir1, s1 := testServerDC(t, "dc1")
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDC(t, "dc2")
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDC(t, "dc2")
defer os.RemoveAll(dir3)
defer s3.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfWANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinWAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := s3.JoinWAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
// Check the members
testutil.WaitForResult(func() (bool, error) {
return len(s1.WANMembers()) == 3, nil
}, func(err error) {
t.Fatalf("bad len")
})
// Wait for coordinates to be exchanged
time.Sleep(s1.config.SerfWANConfig.MemberlistConfig.ProbeInterval * 2)
var coords structs.CoordinateList
arg := structs.DCSpecificRequest{
Datacenter: "dc1",
}
if err := client.Call("Coordinate.GetWAN", &arg, &coords); err != nil {
t.Fatalf("err: %v", err)
}
if len(coords.Coords) != 1 {
t.Fatalf("there is 1 server in dc1")
}
arg = structs.DCSpecificRequest{
Datacenter: "dc2",
}
if err := client.Call("Coordinate.GetWAN", &arg, &coords); err != nil {
t.Fatalf("err: %v", err)
}
if len(coords.Coords) != 2 {
t.Fatalf("there are 2 servers in dc2")
}
} }

View File

@ -253,11 +253,10 @@ func (c *consulFSM) applyCoordinateOperation(buf []byte, index uint64) interface
if err := structs.Decode(buf, &reqs); err != nil { if err := structs.Decode(buf, &reqs); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err)) panic(fmt.Errorf("failed to decode request: %v", err))
} }
for i := 0; i < len(reqs); i++ { for _, req := range reqs {
req := reqs[i]
defer metrics.MeasureSince([]string{"consul", "fsm", "coordinate", string(req.Op)}, time.Now()) defer metrics.MeasureSince([]string{"consul", "fsm", "coordinate", string(req.Op)}, time.Now())
switch req.Op { switch req.Op {
case structs.CoordinateSet: case structs.CoordinateUpdate:
coord := &structs.Coordinate{Node: req.Node, Coord: req.Coord} coord := &structs.Coordinate{Node: req.Node, Coord: req.Coord}
if err := c.state.CoordinateUpdate(index, coord); err != nil { if err := c.state.CoordinateUpdate(index, coord); err != nil {
return err return err

View File

@ -740,13 +740,19 @@ func TestFSM_CoordinateUpdate(t *testing.T) {
} }
defer fsm.Close() defer fsm.Close()
nodeName := "Node1" // Write a batch of two coordinates.
reqs := make([]*structs.CoordinateUpdateRequest, 1) reqs := make([]*structs.CoordinateUpdateRequest, 2)
reqs[0] = &structs.CoordinateUpdateRequest{ reqs[0] = &structs.CoordinateUpdateRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: nodeName, Node: "node1",
Op: structs.CoordinateSet, Op: structs.CoordinateUpdate,
Coord: getRandomCoordinate(), Coord: generateRandomCoordinate(),
}
reqs[1] = &structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "node2",
Op: structs.CoordinateUpdate,
Coord: generateRandomCoordinate(),
} }
buf, err := structs.Encode(structs.CoordinateRequestType, reqs) buf, err := structs.Encode(structs.CoordinateRequestType, reqs)
if err != nil { if err != nil {
@ -757,17 +763,24 @@ func TestFSM_CoordinateUpdate(t *testing.T) {
t.Fatalf("resp: %v", resp) t.Fatalf("resp: %v", resp)
} }
// Verify key is set // Read back the two coordinates to make sure they got updated.
_, d, err := fsm.state.CoordinateGet(nodeName) _, d, err := fsm.state.CoordinateGet(reqs[0].Node)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if d == nil { if d == nil {
t.Fatalf("missing") t.Fatalf("missing")
} }
if !coordinatesEqual(reqs[0].Coord, d.Coord) { verifyCoordinatesEqual(t, reqs[0].Coord, d.Coord)
t.Fatalf("wrong coordinate")
_, d, err = fsm.state.CoordinateGet(reqs[1].Node)
if err != nil {
t.Fatalf("err: %v", err)
} }
if d == nil {
t.Fatalf("missing")
}
verifyCoordinatesEqual(t, reqs[1].Coord, d.Coord)
} }
func TestFSM_SessionCreate_Destroy(t *testing.T) { func TestFSM_SessionCreate_Destroy(t *testing.T) {

View File

@ -309,12 +309,11 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
return nil, err return nil, err
} }
// Let Serf cache coordinates for only the WAN configuration where the
// number of nodes should be small, and where we serve these directly
// from Serf because they aren't managed in the catalog.
conf.EnableCoordinates = s.config.EnableCoordinates conf.EnableCoordinates = s.config.EnableCoordinates
if conf.EnableCoordinates && wan { conf.CacheCoordinates = s.config.EnableCoordinates && wan
// Cache coordinates only if it's the wan network where the number of nodes is
// reasonably low.
conf.CacheCoordinates = true
}
return serf.Create(conf) return serf.Create(conf)
} }
@ -406,10 +405,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
s.endpoints.Session = &Session{s} s.endpoints.Session = &Session{s}
s.endpoints.Internal = &Internal{s} s.endpoints.Internal = &Internal{s}
s.endpoints.ACL = &ACL{s} s.endpoints.ACL = &ACL{s}
s.endpoints.Coordinate = &Coordinate{ s.endpoints.Coordinate = NewCoordinate(s)
srv: s,
updateLastSent: time.Now(),
}
// Register the handlers // Register the handlers
s.rpcServer.Register(s.endpoints.Status) s.rpcServer.Register(s.endpoints.Status)
@ -706,12 +702,12 @@ func (s *Server) Stats() map[string]map[string]string {
return stats return stats
} }
// GetLANCoordinate returns the LAN coordinate of the server // GetLANCoordinate returns the coordinate of the server in the LAN gossip pool.
func (s *Server) GetLANCoordinate() *coordinate.Coordinate { func (s *Server) GetLANCoordinate() *coordinate.Coordinate {
return s.serfLAN.GetCoordinate() return s.serfLAN.GetCoordinate()
} }
// GetWANCoordinate returns the WAN coordinate of the server // GetWANCoordinate returns the coordinate of the server in the WAN gossip pool.
func (s *Server) GetWANCoordinate() *coordinate.Coordinate { func (s *Server) GetWANCoordinate() *coordinate.Coordinate {
return s.serfWAN.GetCoordinate() return s.serfWAN.GetCoordinate()
} }

View File

@ -68,7 +68,7 @@ func testServerConfig(t *testing.T, NodeName string) (string, *Config) {
config.ReconcileInterval = 100 * time.Millisecond config.ReconcileInterval = 100 * time.Millisecond
config.EnableCoordinates = true config.EnableCoordinates = true
config.CoordinateUpdatePeriod = 0 // make updates instant config.CoordinateUpdatePeriod = 0 * time.Millisecond
return dir, config return dir, config
} }

View File

@ -620,28 +620,36 @@ type ACLPolicy struct {
QueryMeta QueryMeta
} }
// Coordinate stores a mapping from a node to its network coordinate // Coordinate stores a node name with its associated network coordinate. This is
// used when querying WAN coordinates.
type Coordinate struct { type Coordinate struct {
Node string Node string
Coord *coordinate.Coordinate Coord *coordinate.Coordinate
} }
// CoordinateList is a list of Coordinate structs. This is used when querying WAN
// coordinates.
type CoordinateList struct { type CoordinateList struct {
Coords []Coordinate Coords []Coordinate
} }
// IndexedCoordinate is used to represent a single node's coordinate from the state
// store. This is used when querying LAN coordinates.
type IndexedCoordinate struct { type IndexedCoordinate struct {
Coord *coordinate.Coordinate Coord *coordinate.Coordinate
QueryMeta QueryMeta
} }
// CoordinateOp is used for encoding coordinate-related RPC requests.
type CoordinateOp string type CoordinateOp string
const ( const (
CoordinateSet CoordinateOp = "set" // CoordinateUpdate is used to update a node's coordinates in the catalog.
CoordinateUpdate CoordinateOp = "update"
) )
// CoordinateUpdateRequest is used to update the network coordinate of a given node // CoordinateUpdateRequest is used to update the network coordinate of a given
// node.
type CoordinateUpdateRequest struct { type CoordinateUpdateRequest struct {
Datacenter string Datacenter string
Node string Node string
@ -650,6 +658,7 @@ type CoordinateUpdateRequest struct {
WriteRequest WriteRequest
} }
// RequestDatacenter returns the datacenter for a given update request.
func (c *CoordinateUpdateRequest) RequestDatacenter() string { func (c *CoordinateUpdateRequest) RequestDatacenter() string {
return c.Datacenter return c.Datacenter
} }