From 86b112fe31ee9cda5b30dca3a6021d5f255be289 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Fri, 5 Jun 2015 20:31:33 -0700 Subject: [PATCH] Does a clean up pass on the Consul side. --- command/agent/agent.go | 19 +-- command/agent/config.go | 13 ++- command/agent/local_test.go | 64 ++++------ consul/client.go | 3 +- consul/config.go | 11 +- consul/coordinate_endpoint.go | 121 +++++++++---------- consul/coordinate_endpoint_test.go | 182 +++++++++++------------------ consul/fsm.go | 5 +- consul/fsm_test.go | 31 +++-- consul/server.go | 18 ++- consul/server_test.go | 2 +- consul/structs/structs.go | 15 ++- 12 files changed, 223 insertions(+), 261 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 9d49d14748..a3f7ac739e 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -192,15 +192,17 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { // Start handling events go agent.handleEvents() + // Start sending network coordinate to the server. + if config.EnableCoordinates { + go agent.sendCoordinate() + } + // Write out the PID file if necessary err = agent.storePid() if err != nil { return nil, err } - // Start sending network coordinates to servers - go agent.sendCoordinates() - return agent, nil } @@ -560,12 +562,13 @@ func (a *Agent) ResumeSync() { a.state.Resume() } -// sendCoordinates starts a loop that periodically sends the local coordinate -// to a server -func (a *Agent) sendCoordinates() { +// sendCoordinate is a long-running loop that periodically sends our coordinate +// to the server. Closing the agent's shutdownChannel will cause this to exit. +func (a *Agent) sendCoordinate() { for { intv := aeScale(a.config.SyncCoordinateInterval, len(a.LANMembers())) intv = intv + randomStagger(intv) + select { case <-time.After(intv): var c *coordinate.Coordinate @@ -577,14 +580,14 @@ func (a *Agent) sendCoordinates() { req := structs.CoordinateUpdateRequest{ Datacenter: a.config.Datacenter, Node: a.config.NodeName, - Op: structs.CoordinateSet, + Op: structs.CoordinateUpdate, Coord: c, WriteRequest: structs.WriteRequest{Token: a.config.ACLToken}, } var reply struct{} if err := a.RPC("Coordinate.Update", &req, &reply); err != nil { - a.logger.Printf("[ERR] agent: coordinate update error: %s", err.Error()) + a.logger.Printf("[ERR] agent: coordinate update error: %s", err) } case <-a.shutdownCh: return diff --git a/command/agent/config.go b/command/agent/config.go index 3152fb2b73..fbaccfec66 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -368,13 +368,17 @@ type Config struct { AtlasEndpoint string `mapstructure:"atlas_endpoint"` // AEInterval controls the anti-entropy interval. This is how often - // the agent attempts to reconcile it's local state with the server' + // the agent attempts to reconcile its local state with the server's // representation of our state. Defaults to every 60s. AEInterval time.Duration `mapstructure:"-" json:"-"` - // SyncCoordinateInterval controls the interval for sending network coordinates - // to servers. Defaults to every 15s, but scales up as the number of nodes increases - // in the network, to prevent servers from being overwhelmed. + // EnableCoordinates enables features related to network coordinates. + EnableCoordinates bool `mapstructure:"enable_coordinates" json:"-"` + + // SyncCoordinateInterval controls the interval for sending network + // coordinates to the server. Defaults to every 15s, but scales up as + // the number of nodes increases in the network, to prevent servers from + // being overwhelmed. SyncCoordinateInterval time.Duration `mapstructure:"-" json:"-"` // Checks holds the provided check definitions @@ -471,6 +475,7 @@ func DefaultConfig() *Config { Protocol: consul.ProtocolVersionMax, CheckUpdateInterval: 5 * time.Minute, AEInterval: time.Minute, + EnableCoordinates: true, SyncCoordinateInterval: 15 * time.Second, ACLTTL: 30 * time.Second, ACLDownPolicy: "extend-cache", diff --git a/command/agent/local_test.go b/command/agent/local_test.go index 3d78038e4a..b2499d7035 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -387,6 +387,12 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { } } +var testRegisterRules = ` +service "api" { + policy = "write" +} +` + func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { conf := nextConfig() conf.ACLDatacenter = "dc1" @@ -796,55 +802,31 @@ func TestAgent_nestedPauseResume(t *testing.T) { } -var testRegisterRules = ` -service "api" { - policy = "write" -} -` -func TestAgentSendCoordinates(t *testing.T) { - conf1 := nextConfig() - conf1.SyncCoordinateInterval = 10 * time.Millisecond - dir1, agent1 := makeAgent(t, conf1) - defer os.RemoveAll(dir1) - defer agent1.Shutdown() - conf2 := nextConfig() - conf2.SyncCoordinateInterval = 10 * time.Millisecond - dir2, agent2 := makeAgent(t, conf2) - defer os.RemoveAll(dir2) - defer agent2.Shutdown() +func TestAgent_sendCoordinate(t *testing.T) { + conf := nextConfig() + conf.SyncCoordinateInterval = 10 * time.Millisecond + conf.ConsulConfig.CoordinateUpdatePeriod = 0 * time.Millisecond + dir, agent := makeAgent(t, conf) + defer os.RemoveAll(dir) + defer agent.Shutdown() - agent2Addr := fmt.Sprintf("127.0.0.1:%d", agent2.config.Ports.SerfLan) - if _, err := agent2.JoinLAN([]string{agent2Addr}); err != nil { - t.Fatalf("err: %s", err) + testutil.WaitForLeader(t, agent.RPC, "dc1") + + // Wait a little while for an update. + time.Sleep(3 * conf.SyncCoordinateInterval) + + // Make sure the coordinate is present. + req := structs.NodeSpecificRequest { + Datacenter: agent.config.Datacenter, + Node: agent.config.NodeName, } - - testutil.WaitForLeader(t, agent1.RPC, "dc1") - - time.Sleep(100 * time.Millisecond) - var reply structs.IndexedCoordinate - req := structs.CoordinateGetRequest{ - Datacenter: agent1.config.Datacenter, - Node: agent1.config.NodeName, - } - if err := agent1.RPC("Coordinate.Get", &req, &reply); err != nil { + if err := agent.RPC("Coordinate.Get", &req, &reply); err != nil { t.Fatalf("err: %v", err) } if reply.Coord == nil { t.Fatalf("should get a coordinate") } - - var reply2 structs.IndexedCoordinate - req2 := structs.CoordinateGetRequest{ - Datacenter: agent2.config.Datacenter, - Node: agent2.config.NodeName, - } - if err := agent1.RPC("Coordinate.Get", &req2, &reply2); err != nil { - t.Fatalf("err: %v", err) - } - if reply2.Coord == nil { - t.Fatalf("should get a coordinate") - } } diff --git a/consul/client.go b/consul/client.go index d1707ec9b1..f58de052d8 100644 --- a/consul/client.go +++ b/consul/client.go @@ -380,7 +380,8 @@ func (c *Client) Stats() map[string]map[string]string { return stats } -// GetCoordinate returns the network coordinate of the receiver +// GetCoordinate returns the network coordinate of the current node, as +// maintained by Serf. func (c *Client) GetCoordinate() *coordinate.Coordinate { return c.serf.GetCoordinate() } diff --git a/consul/config.go b/consul/config.go index ad30e3aa65..5e87087f23 100644 --- a/consul/config.go +++ b/consul/config.go @@ -206,13 +206,14 @@ type Config struct { // EnableCoordinates enables features related to network coordinates. EnableCoordinates bool - // CoordinateUpdatePeriod controls how long a server batches coordinate updates - // before applying them in a Raft transaction. A larger period leads to fewer - // Raft transactions, but also the stored coordinates being more stale. + // CoordinateUpdatePeriod controls how long a server batches coordinate + // updates before applying them in a Raft transaction. A larger period + // leads to fewer Raft transactions, but also the stored coordinates + // being more stale. CoordinateUpdatePeriod time.Duration // CoordinateUpdateMaxBatchSize controls the maximum number of updates a - // server batches before applying them in a Raft transaction + // server batches before applying them in a Raft transaction. CoordinateUpdateMaxBatchSize int } @@ -269,7 +270,7 @@ func DefaultConfig() *Config { TombstoneTTLGranularity: 30 * time.Second, SessionTTLMin: 10 * time.Second, EnableCoordinates: true, - CoordinateUpdatePeriod: time.Duration(30) * time.Second, + CoordinateUpdatePeriod: 30 * time.Second, CoordinateUpdateMaxBatchSize: 1000, } diff --git a/consul/coordinate_endpoint.go b/consul/coordinate_endpoint.go index b2549fbd6c..9c130e80b8 100644 --- a/consul/coordinate_endpoint.go +++ b/consul/coordinate_endpoint.go @@ -7,15 +7,68 @@ import ( "github.com/hashicorp/consul/consul/structs" ) +// Coordinate manages queries and updates for network coordinates. type Coordinate struct { - srv *Server - updateLastSent time.Time - updateBuffer []*structs.CoordinateUpdateRequest + // srv is a pointer back to the server. + srv *Server + + // updateLastSent is the last time we flushed pending coordinate updates + // to the Raft log. CoordinateUpdatePeriod is used to control how long we + // wait before doing an update (that time, or hitting more than the + // configured CoordinateUpdateMaxBatchSize, whichever comes first). + updateLastSent time.Time + + // updateBuffer holds the pending coordinate updates, waiting to be + // flushed to the Raft log. + updateBuffer []*structs.CoordinateUpdateRequest + + // updateBufferLock manages concurrent access to updateBuffer. updateBufferLock sync.Mutex } -// GetLAN returns the the LAN coordinate of a node. -func (c *Coordinate) GetLAN(args *structs.NodeSpecificRequest, reply *structs.IndexedCoordinate) error { +// NewCoordinate returns a new Coordinate endpoint. +func NewCoordinate(srv *Server) *Coordinate { + return &Coordinate{ + srv: srv, + updateLastSent: time.Now(), + } +} + +// Update handles requests to update the LAN coordinate of a node. +func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) error { + if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done { + return err + } + + c.updateBufferLock.Lock() + defer c.updateBufferLock.Unlock() + c.updateBuffer = append(c.updateBuffer, args) + + // Process updates in batches to avoid tons of small transactions against + // the Raft log. + shouldFlush := time.Since(c.updateLastSent) > c.srv.config.CoordinateUpdatePeriod || + len(c.updateBuffer) > c.srv.config.CoordinateUpdateMaxBatchSize + if shouldFlush { + // This transaction could take a while so we don't block here. + buf := c.updateBuffer + go func() { + _, err := c.srv.raftApply(structs.CoordinateRequestType, buf) + if err != nil { + c.srv.logger.Printf("[ERR] consul.coordinate: Update failed: %v", err) + } + }() + + // We clear the buffer regardless of whether the raft transaction + // succeeded, just so the buffer doesn't keep growing without bound. + c.updateLastSent = time.Now() + c.updateBuffer = nil + } + + return nil +} + +// Get returns the coordinate of the given node in the LAN. +func (c *Coordinate) Get(args *structs.NodeSpecificRequest, reply *structs.IndexedCoordinate) error { if done, err := c.srv.forward("Coordinate.GetLAN", args, args, reply); done { return err } @@ -35,61 +88,3 @@ func (c *Coordinate) GetLAN(args *structs.NodeSpecificRequest, reply *structs.In return err }) } - -// GetWAN returns the WAN coordinates of the servers in a given datacenter. -// -// Note that the server does not necessarily know about *all* servers in the given datacenter. -// It just returns the coordinates of those that it knows. -func (c *Coordinate) GetWAN(args *structs.DCSpecificRequest, reply *structs.CoordinateList) error { - if args.Datacenter == c.srv.config.Datacenter { - reply.Coords = make([]structs.Coordinate, 1) - reply.Coords[0] = structs.Coordinate{ - Node: c.srv.config.NodeName, - Coord: c.srv.GetWANCoordinate(), - } - } else { - servers := c.srv.remoteConsuls[args.Datacenter] // servers in the specified DC - reply.Coords = make([]structs.Coordinate, 0) - for i := 0; i < len(servers); i++ { - if coord := c.srv.serfWAN.GetCachedCoordinate(servers[i].Name); coord != nil { - reply.Coords = append(reply.Coords, structs.Coordinate{ - Node: servers[i].Name, - Coord: coord, - }) - } - } - } - - return nil -} - -func flushCoordinates(c *Coordinate, buf []*structs.CoordinateUpdateRequest) { - _, err := c.srv.raftApply(structs.CoordinateRequestType, buf) - if err != nil { - c.srv.logger.Printf("[ERR] consul.coordinate: Update failed: %v", err) - } -} - -// Update updates the the LAN coordinate of a node. -func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) error { - if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done { - return err - } - - c.updateBufferLock.Lock() - defer c.updateBufferLock.Unlock() - c.updateBuffer = append(c.updateBuffer, args) - - if time.Since(c.updateLastSent) > c.srv.config.CoordinateUpdatePeriod || len(c.updateBuffer) > c.srv.config.CoordinateUpdateMaxBatchSize { - c.srv.logger.Printf("sending update for %v", args.Node) - // Apply the potentially time-consuming transaction out of band - go flushCoordinates(c, c.updateBuffer) - - // We clear the buffer regardless of whether the raft transaction succeeded, just so the - // buffer doesn't keep growing without bound. - c.updateLastSent = time.Now() - c.updateBuffer = nil - } - - return nil -} diff --git a/consul/coordinate_endpoint_test.go b/consul/coordinate_endpoint_test.go index 8c7c7d0b1d..3c06afc4a0 100644 --- a/consul/coordinate_endpoint_test.go +++ b/consul/coordinate_endpoint_test.go @@ -13,67 +13,68 @@ import ( "github.com/hashicorp/serf/coordinate" ) -// getRandomCoordinate generates a random coordinate. -func getRandomCoordinate() *coordinate.Coordinate { +// generateRandomCoordinate creates a random coordinate. This mucks with the +// underlying structure directly, so it's not really useful for any particular +// position in the network, but it's a good payload to send through to make +// sure things come out the other side or get stored correctly. +func generateRandomCoordinate() *coordinate.Coordinate { config := coordinate.DefaultConfig() - // Randomly apply updates between n clients - n := 5 - clients := make([]*coordinate.Client, n) - for i := 0; i < n; i++ { - clients[i], _ = coordinate.NewClient(config) + coord := coordinate.NewCoordinate(config) + for i := range coord.Vec { + coord.Vec[i] = rand.NormFloat64() } - - for i := 0; i < n*100; i++ { - k1 := rand.Intn(n) - k2 := rand.Intn(n) - if k1 == k2 { - continue - } - clients[k1].Update(clients[k2].GetCoordinate(), time.Duration(rand.Int63())*time.Microsecond) - } - return clients[rand.Intn(n)].GetCoordinate() + coord.Error = rand.NormFloat64() + coord.Adjustment = rand.NormFloat64() + return coord } -func coordinatesEqual(a, b *coordinate.Coordinate) bool { - return reflect.DeepEqual(a, b) +// verifyCoordinatesEqual will compare a and b and fail if they are not exactly +// equal (no floating point fuzz is considered). +func verifyCoordinatesEqual(t *testing.T, a, b *coordinate.Coordinate) { + if !reflect.DeepEqual(a, b) { + t.Fatalf("coordinates are not equal: %v != %v", a, b) + } } func TestCoordinate_Update(t *testing.T) { name := fmt.Sprintf("Node %d", getPort()) dir1, config1 := testServerConfig(t, name) - config1.CoordinateUpdatePeriod = 1000 * time.Millisecond + defer os.RemoveAll(dir1) + + config1.CoordinateUpdatePeriod = 1 * time.Second + config1.CoordinateUpdateMaxBatchSize = 5 s1, err := NewServer(config1) if err != nil { t.Fatal(err) } - defer os.RemoveAll(dir1) defer s1.Shutdown() client := rpcClient(t, s1) defer client.Close() - testutil.WaitForLeader(t, client.Call, "dc1") arg1 := structs.CoordinateUpdateRequest{ Datacenter: "dc1", Node: "node1", - Op: structs.CoordinateSet, - Coord: getRandomCoordinate(), + Op: structs.CoordinateUpdate, + Coord: generateRandomCoordinate(), } arg2 := structs.CoordinateUpdateRequest{ Datacenter: "dc1", Node: "node2", - Op: structs.CoordinateSet, - Coord: getRandomCoordinate(), + Op: structs.CoordinateUpdate, + Coord: generateRandomCoordinate(), } + // Send an update for the first node. var out struct{} if err := client.Call("Coordinate.Update", &arg1, &out); err != nil { t.Fatalf("err: %v", err) } - // Verify + // Make sure the update did not yet apply because the batching thresholds + // haven't yet been met. state := s1.fsm.State() _, d, err := state.CoordinateGet("node1") if err != nil { @@ -83,12 +84,15 @@ func TestCoordinate_Update(t *testing.T) { t.Fatalf("should be nil because the update should be batched") } - // Wait a while and send another update; this time the updates should be sent + // Wait a while and send another update. This time both updates should + // be applied. time.Sleep(2 * s1.config.CoordinateUpdatePeriod) if err := client.Call("Coordinate.Update", &arg2, &out); err != nil { t.Fatalf("err: %v", err) } - // Yield the current goroutine to allow the goroutine that sends the updates to run + + // Wait a little while so the flush goroutine can run, then make sure + // both coordinates made it in. time.Sleep(100 * time.Millisecond) _, d, err = state.CoordinateGet("node1") @@ -98,9 +102,7 @@ func TestCoordinate_Update(t *testing.T) { if d == nil { t.Fatalf("should return a coordinate but it's nil") } - if !coordinatesEqual(d.Coord, arg1.Coord) { - t.Fatalf("should be equal\n%v\n%v", d.Coord, arg1.Coord) - } + verifyCoordinatesEqual(t, d.Coord, arg1.Coord) _, d, err = state.CoordinateGet("node2") if err != nil { @@ -109,12 +111,32 @@ func TestCoordinate_Update(t *testing.T) { if d == nil { t.Fatalf("should return a coordinate but it's nil") } - if !coordinatesEqual(d.Coord, arg2.Coord) { - t.Fatalf("should be equal\n%v\n%v", d.Coord, arg2.Coord) + verifyCoordinatesEqual(t, d.Coord, arg2.Coord) + + // Now try spamming coordinates and make sure it flushes when the batch + // size is hit. + for i := 0; i < (s1.config.CoordinateUpdateMaxBatchSize + 1); i++ { + arg1.Coord = generateRandomCoordinate() + if err := client.Call("Coordinate.Update", &arg1, &out); err != nil { + t.Fatalf("err: %v", err) + } } + + // Wait a little while so the flush goroutine can run, then make sure + // the last coordinate update made it in. + time.Sleep(100 * time.Millisecond) + + _, d, err = state.CoordinateGet("node1") + if err != nil { + t.Fatalf("err: %v", err) + } + if d == nil { + t.Fatalf("should return a coordinate but it's nil") + } + verifyCoordinatesEqual(t, d.Coord, arg1.Coord) } -func TestCoordinate_GetLAN(t *testing.T) { +func TestCoordinate_Get(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -126,103 +148,39 @@ func TestCoordinate_GetLAN(t *testing.T) { arg := structs.CoordinateUpdateRequest{ Datacenter: "dc1", Node: "node1", - Op: structs.CoordinateSet, - Coord: getRandomCoordinate(), + Op: structs.CoordinateUpdate, + Coord: generateRandomCoordinate(), } + // Send an initial update, waiting a little while for the flush goroutine + // to run. var out struct{} if err := client.Call("Coordinate.Update", &arg, &out); err != nil { t.Fatalf("err: %v", err) } - // Yield the current goroutine to allow the goroutine that sends the updates to run time.Sleep(100 * time.Millisecond) - // Get via RPC - out2 := structs.IndexedCoordinate{} + // Query the coordinate via RPC. arg2 := structs.NodeSpecificRequest{ Datacenter: "dc1", Node: "node1", } - if err := client.Call("Coordinate.GetLAN", &arg2, &out2); err != nil { + coord := structs.IndexedCoordinate{} + if err := client.Call("Coordinate.Get", &arg2, &coord); err != nil { t.Fatalf("err: %v", err) } - if !coordinatesEqual(out2.Coord, arg.Coord) { - t.Fatalf("should be equal\n%v\n%v", out2.Coord, arg.Coord) - } + verifyCoordinatesEqual(t, coord.Coord, arg.Coord) - // Now let's override the original coordinate; Coordinate.Get should return - // the latest coordinate - arg.Coord = getRandomCoordinate() + // Send another coordinate update, waiting after for the flush. + arg.Coord = generateRandomCoordinate() if err := client.Call("Coordinate.Update", &arg, &out); err != nil { t.Fatalf("err: %v", err) } - // Yield the current goroutine to allow the goroutine that sends the updates to run time.Sleep(100 * time.Millisecond) - if err := client.Call("Coordinate.GetLAN", &arg2, &out2); err != nil { + // Now re-query and make sure the results are fresh. + if err := client.Call("Coordinate.Get", &arg2, &coord); err != nil { t.Fatalf("err: %v", err) } - if !coordinatesEqual(out2.Coord, arg.Coord) { - t.Fatalf("should be equal\n%v\n%v", out2.Coord, arg.Coord) - } -} - -func TestCoordinate_GetWAN(t *testing.T) { - // Create 1 server in dc1, 2 servers in dc2 - dir1, s1 := testServerDC(t, "dc1") - defer os.RemoveAll(dir1) - defer s1.Shutdown() - - dir2, s2 := testServerDC(t, "dc2") - defer os.RemoveAll(dir2) - defer s2.Shutdown() - - dir3, s3 := testServerDC(t, "dc2") - defer os.RemoveAll(dir3) - defer s3.Shutdown() - - client := rpcClient(t, s1) - defer client.Close() - testutil.WaitForLeader(t, client.Call, "dc1") - - // Try to join - addr := fmt.Sprintf("127.0.0.1:%d", - s1.config.SerfWANConfig.MemberlistConfig.BindPort) - if _, err := s2.JoinWAN([]string{addr}); err != nil { - t.Fatalf("err: %v", err) - } - if _, err := s3.JoinWAN([]string{addr}); err != nil { - t.Fatalf("err: %v", err) - } - - // Check the members - testutil.WaitForResult(func() (bool, error) { - return len(s1.WANMembers()) == 3, nil - }, func(err error) { - t.Fatalf("bad len") - }) - - // Wait for coordinates to be exchanged - time.Sleep(s1.config.SerfWANConfig.MemberlistConfig.ProbeInterval * 2) - - var coords structs.CoordinateList - arg := structs.DCSpecificRequest{ - Datacenter: "dc1", - } - if err := client.Call("Coordinate.GetWAN", &arg, &coords); err != nil { - t.Fatalf("err: %v", err) - } - if len(coords.Coords) != 1 { - t.Fatalf("there is 1 server in dc1") - } - - arg = structs.DCSpecificRequest{ - Datacenter: "dc2", - } - if err := client.Call("Coordinate.GetWAN", &arg, &coords); err != nil { - t.Fatalf("err: %v", err) - } - if len(coords.Coords) != 2 { - t.Fatalf("there are 2 servers in dc2") - } + verifyCoordinatesEqual(t, coord.Coord, arg.Coord) } diff --git a/consul/fsm.go b/consul/fsm.go index 22a40f313d..d212bdb8a4 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -253,11 +253,10 @@ func (c *consulFSM) applyCoordinateOperation(buf []byte, index uint64) interface if err := structs.Decode(buf, &reqs); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } - for i := 0; i < len(reqs); i++ { - req := reqs[i] + for _, req := range reqs { defer metrics.MeasureSince([]string{"consul", "fsm", "coordinate", string(req.Op)}, time.Now()) switch req.Op { - case structs.CoordinateSet: + case structs.CoordinateUpdate: coord := &structs.Coordinate{Node: req.Node, Coord: req.Coord} if err := c.state.CoordinateUpdate(index, coord); err != nil { return err diff --git a/consul/fsm_test.go b/consul/fsm_test.go index 8c1d56497e..6cee647a0b 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -740,13 +740,19 @@ func TestFSM_CoordinateUpdate(t *testing.T) { } defer fsm.Close() - nodeName := "Node1" - reqs := make([]*structs.CoordinateUpdateRequest, 1) + // Write a batch of two coordinates. + reqs := make([]*structs.CoordinateUpdateRequest, 2) reqs[0] = &structs.CoordinateUpdateRequest{ Datacenter: "dc1", - Node: nodeName, - Op: structs.CoordinateSet, - Coord: getRandomCoordinate(), + Node: "node1", + Op: structs.CoordinateUpdate, + Coord: generateRandomCoordinate(), + } + reqs[1] = &structs.CoordinateUpdateRequest{ + Datacenter: "dc1", + Node: "node2", + Op: structs.CoordinateUpdate, + Coord: generateRandomCoordinate(), } buf, err := structs.Encode(structs.CoordinateRequestType, reqs) if err != nil { @@ -757,17 +763,24 @@ func TestFSM_CoordinateUpdate(t *testing.T) { t.Fatalf("resp: %v", resp) } - // Verify key is set - _, d, err := fsm.state.CoordinateGet(nodeName) + // Read back the two coordinates to make sure they got updated. + _, d, err := fsm.state.CoordinateGet(reqs[0].Node) if err != nil { t.Fatalf("err: %v", err) } if d == nil { t.Fatalf("missing") } - if !coordinatesEqual(reqs[0].Coord, d.Coord) { - t.Fatalf("wrong coordinate") + verifyCoordinatesEqual(t, reqs[0].Coord, d.Coord) + + _, d, err = fsm.state.CoordinateGet(reqs[1].Node) + if err != nil { + t.Fatalf("err: %v", err) } + if d == nil { + t.Fatalf("missing") + } + verifyCoordinatesEqual(t, reqs[1].Coord, d.Coord) } func TestFSM_SessionCreate_Destroy(t *testing.T) { diff --git a/consul/server.go b/consul/server.go index cdb51d389e..60b03469e1 100644 --- a/consul/server.go +++ b/consul/server.go @@ -309,12 +309,11 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w return nil, err } + // Let Serf cache coordinates for only the WAN configuration where the + // number of nodes should be small, and where we serve these directly + // from Serf because they aren't managed in the catalog. conf.EnableCoordinates = s.config.EnableCoordinates - if conf.EnableCoordinates && wan { - // Cache coordinates only if it's the wan network where the number of nodes is - // reasonably low. - conf.CacheCoordinates = true - } + conf.CacheCoordinates = s.config.EnableCoordinates && wan return serf.Create(conf) } @@ -406,10 +405,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { s.endpoints.Session = &Session{s} s.endpoints.Internal = &Internal{s} s.endpoints.ACL = &ACL{s} - s.endpoints.Coordinate = &Coordinate{ - srv: s, - updateLastSent: time.Now(), - } + s.endpoints.Coordinate = NewCoordinate(s) // Register the handlers s.rpcServer.Register(s.endpoints.Status) @@ -706,12 +702,12 @@ func (s *Server) Stats() map[string]map[string]string { return stats } -// GetLANCoordinate returns the LAN coordinate of the server +// GetLANCoordinate returns the coordinate of the server in the LAN gossip pool. func (s *Server) GetLANCoordinate() *coordinate.Coordinate { return s.serfLAN.GetCoordinate() } -// GetWANCoordinate returns the WAN coordinate of the server +// GetWANCoordinate returns the coordinate of the server in the WAN gossip pool. func (s *Server) GetWANCoordinate() *coordinate.Coordinate { return s.serfWAN.GetCoordinate() } diff --git a/consul/server_test.go b/consul/server_test.go index 56ce691c1c..e96c859ceb 100644 --- a/consul/server_test.go +++ b/consul/server_test.go @@ -68,7 +68,7 @@ func testServerConfig(t *testing.T, NodeName string) (string, *Config) { config.ReconcileInterval = 100 * time.Millisecond config.EnableCoordinates = true - config.CoordinateUpdatePeriod = 0 // make updates instant + config.CoordinateUpdatePeriod = 0 * time.Millisecond return dir, config } diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 232ac54439..4af5772aac 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -620,28 +620,36 @@ type ACLPolicy struct { QueryMeta } -// Coordinate stores a mapping from a node to its network coordinate +// Coordinate stores a node name with its associated network coordinate. This is +// used when querying WAN coordinates. type Coordinate struct { Node string Coord *coordinate.Coordinate } +// CoordinateList is a list of Coordinate structs. This is used when querying WAN +// coordinates. type CoordinateList struct { Coords []Coordinate } +// IndexedCoordinate is used to represent a single node's coordinate from the state +// store. This is used when querying LAN coordinates. type IndexedCoordinate struct { Coord *coordinate.Coordinate QueryMeta } +// CoordinateOp is used for encoding coordinate-related RPC requests. type CoordinateOp string const ( - CoordinateSet CoordinateOp = "set" + // CoordinateUpdate is used to update a node's coordinates in the catalog. + CoordinateUpdate CoordinateOp = "update" ) -// CoordinateUpdateRequest is used to update the network coordinate of a given node +// CoordinateUpdateRequest is used to update the network coordinate of a given +// node. type CoordinateUpdateRequest struct { Datacenter string Node string @@ -650,6 +658,7 @@ type CoordinateUpdateRequest struct { WriteRequest } +// RequestDatacenter returns the datacenter for a given update request. func (c *CoordinateUpdateRequest) RequestDatacenter() string { return c.Datacenter }