diff --git a/command/agent/agent.go b/command/agent/agent.go index ab913c9f99..eafb6720ff 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -580,7 +580,6 @@ func (a *Agent) sendCoordinate() { req := structs.CoordinateUpdateRequest{ Datacenter: a.config.Datacenter, Node: a.config.NodeName, - Op: structs.CoordinateUpdate, Coord: c, WriteRequest: structs.WriteRequest{Token: a.config.ACLToken}, } diff --git a/command/agent/config.go b/command/agent/config.go index 6f0b3a67ed..831ffa0286 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -376,9 +376,10 @@ type Config struct { DisableCoordinates bool `mapstructure:"disable_coordinates" json:"-"` // SyncCoordinateInterval controls the interval for sending network - // coordinates to the server. Defaults to every 15s, but scales up as + // coordinates to the server. Defaults to every 20s, but scales up as // the number of nodes increases in the network, to prevent servers from - // being overwhelmed. + // being overwhelmed. If you update this, you may need to adjust the + // tuning of CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize. SyncCoordinateInterval time.Duration `mapstructure:"-" json:"-"` // Checks holds the provided check definitions @@ -476,7 +477,7 @@ func DefaultConfig() *Config { CheckUpdateInterval: 5 * time.Minute, AEInterval: time.Minute, DisableCoordinates: false, - SyncCoordinateInterval: 15 * time.Second, + SyncCoordinateInterval: 20 * time.Second, ACLTTL: 30 * time.Second, ACLDownPolicy: "extend-cache", ACLDefaultPolicy: "allow", diff --git a/command/agent/local_test.go b/command/agent/local_test.go index 812379f3eb..d456d387bb 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/serf/coordinate" ) func TestAgentAntiEntropy_Services(t *testing.T) { @@ -807,7 +808,8 @@ func TestAgent_nestedPauseResume(t *testing.T) { func TestAgent_sendCoordinate(t *testing.T) { conf := nextConfig() conf.SyncCoordinateInterval = 10 * time.Millisecond - conf.ConsulConfig.CoordinateUpdatePeriod = 0 * time.Millisecond + conf.ConsulConfig.CoordinateUpdatePeriod = 100 * time.Millisecond + conf.ConsulConfig.CoordinateUpdateMaxBatchSize = 20 dir, agent := makeAgent(t, conf) defer os.RemoveAll(dir) defer agent.Shutdown() @@ -815,7 +817,7 @@ func TestAgent_sendCoordinate(t *testing.T) { testutil.WaitForLeader(t, agent.RPC, "dc1") // Wait a little while for an update. - time.Sleep(3 * conf.SyncCoordinateInterval) + time.Sleep(2 * conf.ConsulConfig.CoordinateUpdatePeriod) // Make sure the coordinate is present. req := structs.NodeSpecificRequest{ @@ -829,4 +831,45 @@ func TestAgent_sendCoordinate(t *testing.T) { if reply.Coord == nil { t.Fatalf("should get a coordinate") } + + // Start spamming for a little while to get rate limit errors back from + // the server. + conf.SyncCoordinateInterval = 1 * time.Millisecond + time.Sleep(2 * conf.ConsulConfig.CoordinateUpdatePeriod) + + // Slow down and let the server catch up. + conf.SyncCoordinateInterval = 10 * time.Millisecond + time.Sleep(2 * conf.ConsulConfig.CoordinateUpdatePeriod) + + // Inject a random coordinate so we can confirm that the periodic process + // is still able to update it. + zeroCoord := &coordinate.Coordinate{} + func() { + req := structs.CoordinateUpdateRequest{ + Datacenter: agent.config.Datacenter, + Node: agent.config.NodeName, + Coord: zeroCoord, + WriteRequest: structs.WriteRequest{Token: agent.config.ACLToken}, + } + var reply struct{} + if err := agent.RPC("Coordinate.Update", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + }() + + // Wait a little while for the injected update, as well as periodic ones + // to fire. + time.Sleep(2 * conf.ConsulConfig.CoordinateUpdatePeriod) + + // Make sure the injected coordinate is not the one that's present. + req = structs.NodeSpecificRequest{ + Datacenter: agent.config.Datacenter, + Node: agent.config.NodeName, + } + if err := agent.RPC("Coordinate.Get", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + if reflect.DeepEqual(zeroCoord, reply.Coord) { + t.Fatalf("should not have gotten the zero coordinate") + } } diff --git a/command/agent/util.go b/command/agent/util.go index 5f0bcff511..a749836ff5 100644 --- a/command/agent/util.go +++ b/command/agent/util.go @@ -18,14 +18,17 @@ import ( ) const ( - // This scale factor means we will add a minute after we - // cross 128 nodes, another at 256, another at 512, etc. - // By 8192 nodes, we will scale up by a factor of 8 + // This scale factor means we will add a minute after we cross 128 nodes, + // another at 256, another at 512, etc. By 8192 nodes, we will scale up + // by a factor of 8. + // + // If you update this, you may need to adjust the tuning of + // CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize. aeScaleThreshold = 128 ) -// aeScale is used to scale the time interval at which anti-entropy -// take place. It is used to prevent saturation as the cluster size grows +// aeScale is used to scale the time interval at which anti-entropy and coordinate +// updates take place. It is used to prevent saturation as the cluster size grows. func aeScale(interval time.Duration, n int) time.Duration { // Don't scale until we cross the threshold if n <= aeScaleThreshold { diff --git a/consul/config.go b/consul/config.go index 702601deab..e307a1fa0d 100644 --- a/consul/config.go +++ b/consul/config.go @@ -255,23 +255,29 @@ func DefaultConfig() *Config { } conf := &Config{ - Datacenter: DefaultDC, - NodeName: hostname, - RPCAddr: DefaultRPCAddr, - RaftConfig: raft.DefaultConfig(), - SerfLANConfig: serf.DefaultConfig(), - SerfWANConfig: serf.DefaultConfig(), - ReconcileInterval: 60 * time.Second, - ProtocolVersion: ProtocolVersionMax, - ACLTTL: 30 * time.Second, - ACLDefaultPolicy: "allow", - ACLDownPolicy: "extend-cache", - TombstoneTTL: 15 * time.Minute, - TombstoneTTLGranularity: 30 * time.Second, - SessionTTLMin: 10 * time.Second, - DisableCoordinates: false, - CoordinateUpdatePeriod: 30 * time.Second, - CoordinateUpdateMaxBatchSize: 1000, + Datacenter: DefaultDC, + NodeName: hostname, + RPCAddr: DefaultRPCAddr, + RaftConfig: raft.DefaultConfig(), + SerfLANConfig: serf.DefaultConfig(), + SerfWANConfig: serf.DefaultConfig(), + ReconcileInterval: 60 * time.Second, + ProtocolVersion: ProtocolVersionMax, + ACLTTL: 30 * time.Second, + ACLDefaultPolicy: "allow", + ACLDownPolicy: "extend-cache", + TombstoneTTL: 15 * time.Minute, + TombstoneTTLGranularity: 30 * time.Second, + SessionTTLMin: 10 * time.Second, + DisableCoordinates: false, + + // SyncCoordinateInterval defaults to 20 seconds, and scales up + // as the number of nodes in the cluster goes up. For 100k nodes, + // it will move up to 201 seconds, which gives an update rate of + // just under 500 updates per second. We will split this into 2 + // batches. + CoordinateUpdatePeriod: 500 * time.Millisecond, + CoordinateUpdateMaxBatchSize: 250, } // Increase our reap interval to 3 days instead of 24h. diff --git a/consul/coordinate_endpoint.go b/consul/coordinate_endpoint.go index 9c130e80b8..b97d2442af 100644 --- a/consul/coordinate_endpoint.go +++ b/consul/coordinate_endpoint.go @@ -1,7 +1,7 @@ package consul import ( - "sync" + "fmt" "time" "github.com/hashicorp/consul/consul/structs" @@ -12,56 +12,69 @@ type Coordinate struct { // srv is a pointer back to the server. srv *Server - // updateLastSent is the last time we flushed pending coordinate updates - // to the Raft log. CoordinateUpdatePeriod is used to control how long we - // wait before doing an update (that time, or hitting more than the - // configured CoordinateUpdateMaxBatchSize, whichever comes first). - updateLastSent time.Time - - // updateBuffer holds the pending coordinate updates, waiting to be - // flushed to the Raft log. - updateBuffer []*structs.CoordinateUpdateRequest - - // updateBufferLock manages concurrent access to updateBuffer. - updateBufferLock sync.Mutex + // updateCh receives coordinate updates and applies them to the raft log + // in batches so that we don't create tons of tiny transactions. + updateCh chan *structs.Coordinate } // NewCoordinate returns a new Coordinate endpoint. func NewCoordinate(srv *Server) *Coordinate { - return &Coordinate{ - srv: srv, - updateLastSent: time.Now(), + len := srv.config.CoordinateUpdateMaxBatchSize + c := &Coordinate{ + srv: srv, + updateCh: make(chan *structs.Coordinate, len), + } + + // This will flush all pending updates at a fixed period. + go func() { + for { + select { + case <-time.After(srv.config.CoordinateUpdatePeriod): + c.batchApplyUpdates() + case <-srv.shutdownCh: + return + } + } + }() + + return c +} + +// batchApplyUpdates is a non-blocking routine that applies all pending updates +// to the Raft log. +func (c *Coordinate) batchApplyUpdates() { + var updates []*structs.Coordinate + for done := false; !done; { + select { + case update := <-c.updateCh: + updates = append(updates, update) + default: + done = true + } + } + + if len(updates) > 0 { + _, err := c.srv.raftApply(structs.CoordinateBatchUpdateType, updates) + if err != nil { + c.srv.logger.Printf("[ERR] consul.coordinate: Batch update failed: %v", err) + } } } -// Update handles requests to update the LAN coordinate of a node. +// Update inserts or updates the LAN coordinate of a node. func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) error { if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done { return err } - c.updateBufferLock.Lock() - defer c.updateBufferLock.Unlock() - c.updateBuffer = append(c.updateBuffer, args) - - // Process updates in batches to avoid tons of small transactions against - // the Raft log. - shouldFlush := time.Since(c.updateLastSent) > c.srv.config.CoordinateUpdatePeriod || - len(c.updateBuffer) > c.srv.config.CoordinateUpdateMaxBatchSize - if shouldFlush { - // This transaction could take a while so we don't block here. - buf := c.updateBuffer - go func() { - _, err := c.srv.raftApply(structs.CoordinateRequestType, buf) - if err != nil { - c.srv.logger.Printf("[ERR] consul.coordinate: Update failed: %v", err) - } - }() - - // We clear the buffer regardless of whether the raft transaction - // succeeded, just so the buffer doesn't keep growing without bound. - c.updateLastSent = time.Now() - c.updateBuffer = nil + // Perform a non-blocking write to the channel. We'd rather spill updates + // than gum things up blocking here. + update := &structs.Coordinate{Node: args.Node, Coord: args.Coord} + select { + case c.updateCh <- update: + // This is a noop - we are done if the write went through. + default: + return fmt.Errorf("Coordinate update rate limit exceeded, increase SyncCoordinateInterval") } return nil diff --git a/consul/coordinate_endpoint_test.go b/consul/coordinate_endpoint_test.go index 3c06afc4a0..feee040f05 100644 --- a/consul/coordinate_endpoint_test.go +++ b/consul/coordinate_endpoint_test.go @@ -5,6 +5,7 @@ import ( "math/rand" "os" "reflect" + "strings" "testing" "time" @@ -29,7 +30,8 @@ func generateRandomCoordinate() *coordinate.Coordinate { } // verifyCoordinatesEqual will compare a and b and fail if they are not exactly -// equal (no floating point fuzz is considered). +// equal (no floating point fuzz is considered since we are trying to make sure +// we are getting exactly the coordinates we expect, without math on them). func verifyCoordinatesEqual(t *testing.T, a, b *coordinate.Coordinate) { if !reflect.DeepEqual(a, b) { t.Fatalf("coordinates are not equal: %v != %v", a, b) @@ -41,7 +43,7 @@ func TestCoordinate_Update(t *testing.T) { dir1, config1 := testServerConfig(t, name) defer os.RemoveAll(dir1) - config1.CoordinateUpdatePeriod = 1 * time.Second + config1.CoordinateUpdatePeriod = 500 * time.Millisecond config1.CoordinateUpdateMaxBatchSize = 5 s1, err := NewServer(config1) if err != nil { @@ -53,28 +55,29 @@ func TestCoordinate_Update(t *testing.T) { defer client.Close() testutil.WaitForLeader(t, client.Call, "dc1") + // Send an update for the first node. arg1 := structs.CoordinateUpdateRequest{ Datacenter: "dc1", Node: "node1", - Op: structs.CoordinateUpdate, Coord: generateRandomCoordinate(), } - - arg2 := structs.CoordinateUpdateRequest{ - Datacenter: "dc1", - Node: "node2", - Op: structs.CoordinateUpdate, - Coord: generateRandomCoordinate(), - } - - // Send an update for the first node. var out struct{} if err := client.Call("Coordinate.Update", &arg1, &out); err != nil { t.Fatalf("err: %v", err) } - // Make sure the update did not yet apply because the batching thresholds - // haven't yet been met. + // Send an update for the second node. + arg2 := structs.CoordinateUpdateRequest{ + Datacenter: "dc1", + Node: "node2", + Coord: generateRandomCoordinate(), + } + if err := client.Call("Coordinate.Update", &arg2, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Make sure the updates did not yet apply because the update period + // hasn't expired. state := s1.fsm.State() _, d, err := state.CoordinateGet("node1") if err != nil { @@ -83,18 +86,16 @@ func TestCoordinate_Update(t *testing.T) { if d != nil { t.Fatalf("should be nil because the update should be batched") } - - // Wait a while and send another update. This time both updates should - // be applied. - time.Sleep(2 * s1.config.CoordinateUpdatePeriod) - if err := client.Call("Coordinate.Update", &arg2, &out); err != nil { + _, d, err = state.CoordinateGet("node2") + if err != nil { t.Fatalf("err: %v", err) } + if d != nil { + t.Fatalf("should be nil because the update should be batched") + } - // Wait a little while so the flush goroutine can run, then make sure - // both coordinates made it in. - time.Sleep(100 * time.Millisecond) - + // Wait a while and the updates should get picked up. + time.Sleep(2 * s1.config.CoordinateUpdatePeriod) _, d, err = state.CoordinateGet("node1") if err != nil { t.Fatalf("err: %v", err) @@ -103,7 +104,6 @@ func TestCoordinate_Update(t *testing.T) { t.Fatalf("should return a coordinate but it's nil") } verifyCoordinatesEqual(t, d.Coord, arg1.Coord) - _, d, err = state.CoordinateGet("node2") if err != nil { t.Fatalf("err: %v", err) @@ -113,19 +113,25 @@ func TestCoordinate_Update(t *testing.T) { } verifyCoordinatesEqual(t, d.Coord, arg2.Coord) - // Now try spamming coordinates and make sure it flushes when the batch - // size is hit. - for i := 0; i < (s1.config.CoordinateUpdateMaxBatchSize + 1); i++ { + // Now try spamming coordinates and make sure it starts dropping when + // the pipe is full. + for i := 0; i < s1.config.CoordinateUpdateMaxBatchSize; i++ { arg1.Coord = generateRandomCoordinate() if err := client.Call("Coordinate.Update", &arg1, &out); err != nil { t.Fatalf("err: %v", err) } } - // Wait a little while so the flush goroutine can run, then make sure - // the last coordinate update made it in. - time.Sleep(100 * time.Millisecond) + // This one should get dropped. + arg2.Coord = generateRandomCoordinate() + err = client.Call("Coordinate.Update", &arg2, &out) + if err == nil || !strings.Contains(err.Error(), "rate limit") { + t.Fatalf("should have failed with a rate limit error, got %v", err) + } + // Wait a little while for the batch routine to run, then make sure + // all but the last coordinate update made it in. + time.Sleep(2 * s1.config.CoordinateUpdatePeriod) _, d, err = state.CoordinateGet("node1") if err != nil { t.Fatalf("err: %v", err) @@ -148,11 +154,10 @@ func TestCoordinate_Get(t *testing.T) { arg := structs.CoordinateUpdateRequest{ Datacenter: "dc1", Node: "node1", - Op: structs.CoordinateUpdate, Coord: generateRandomCoordinate(), } - // Send an initial update, waiting a little while for the flush goroutine + // Send an initial update, waiting a little while for the batch update // to run. var out struct{} if err := client.Call("Coordinate.Update", &arg, &out); err != nil { diff --git a/consul/fsm.go b/consul/fsm.go index d212bdb8a4..2b3327bbc5 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -89,8 +89,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} { return c.applyACLOperation(buf[1:], log.Index) case structs.TombstoneRequestType: return c.applyTombstoneOperation(buf[1:], log.Index) - case structs.CoordinateRequestType: - return c.applyCoordinateOperation(buf[1:], log.Index) + case structs.CoordinateBatchUpdateType: + return c.applyCoordinateBatchUpdate(buf[1:], log.Index) default: if ignoreUnknown { c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType) @@ -248,23 +248,18 @@ func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{ } } -func (c *consulFSM) applyCoordinateOperation(buf []byte, index uint64) interface{} { - var reqs []*structs.CoordinateUpdateRequest - if err := structs.Decode(buf, &reqs); err != nil { - panic(fmt.Errorf("failed to decode request: %v", err)) +// applyCoordinateBatchUpdate processes a batch of coordinate updates and applies +// them in a single underlying transaction. This interface isn't 1:1 with the outer +// update interface that the coordinate endpoint exposes, so we made it single +// purpose and avoided the opcode convention. +func (c *consulFSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interface{} { + var updates []*structs.Coordinate + if err := structs.Decode(buf, &updates); err != nil { + panic(fmt.Errorf("failed to decode batch updates: %v", err)) } - for _, req := range reqs { - defer metrics.MeasureSince([]string{"consul", "fsm", "coordinate", string(req.Op)}, time.Now()) - switch req.Op { - case structs.CoordinateUpdate: - coord := &structs.Coordinate{Node: req.Node, Coord: req.Coord} - if err := c.state.CoordinateUpdate(index, coord); err != nil { - return err - } - default: - c.logger.Printf("[WARN] consul.fsm: Invalid Coordinate operation '%s'", req.Op) - return fmt.Errorf("Invalid Coordinate operation '%s'", req.Op) - } + defer metrics.MeasureSince([]string{"consul", "fsm", "coordinate", "batch-update"}, time.Now()) + if err := c.state.CoordinateBatchUpdate(index, updates); err != nil { + return err } return nil } diff --git a/consul/fsm_test.go b/consul/fsm_test.go index 6cee647a0b..69011ac77b 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -741,20 +741,17 @@ func TestFSM_CoordinateUpdate(t *testing.T) { defer fsm.Close() // Write a batch of two coordinates. - reqs := make([]*structs.CoordinateUpdateRequest, 2) - reqs[0] = &structs.CoordinateUpdateRequest{ - Datacenter: "dc1", - Node: "node1", - Op: structs.CoordinateUpdate, - Coord: generateRandomCoordinate(), + updates := []*structs.Coordinate{ + &structs.Coordinate{ + Node: "node1", + Coord: generateRandomCoordinate(), + }, + &structs.Coordinate{ + Node: "node2", + Coord: generateRandomCoordinate(), + }, } - reqs[1] = &structs.CoordinateUpdateRequest{ - Datacenter: "dc1", - Node: "node2", - Op: structs.CoordinateUpdate, - Coord: generateRandomCoordinate(), - } - buf, err := structs.Encode(structs.CoordinateRequestType, reqs) + buf, err := structs.Encode(structs.CoordinateBatchUpdateType, updates) if err != nil { t.Fatalf("err: %v", err) } @@ -764,23 +761,23 @@ func TestFSM_CoordinateUpdate(t *testing.T) { } // Read back the two coordinates to make sure they got updated. - _, d, err := fsm.state.CoordinateGet(reqs[0].Node) + _, d, err := fsm.state.CoordinateGet(updates[0].Node) if err != nil { t.Fatalf("err: %v", err) } if d == nil { t.Fatalf("missing") } - verifyCoordinatesEqual(t, reqs[0].Coord, d.Coord) + verifyCoordinatesEqual(t, updates[0].Coord, d.Coord) - _, d, err = fsm.state.CoordinateGet(reqs[1].Node) + _, d, err = fsm.state.CoordinateGet(updates[1].Node) if err != nil { t.Fatalf("err: %v", err) } if d == nil { t.Fatalf("missing") } - verifyCoordinatesEqual(t, reqs[1].Coord, d.Coord) + verifyCoordinatesEqual(t, updates[1].Coord, d.Coord) } func TestFSM_SessionCreate_Destroy(t *testing.T) { diff --git a/consul/structs/structs.go b/consul/structs/structs.go index d6c39109d6..552432a826 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -32,7 +32,7 @@ const ( SessionRequestType ACLRequestType TombstoneRequestType - CoordinateRequestType + CoordinateBatchUpdateType ) const ( @@ -633,20 +633,11 @@ type IndexedCoordinate struct { QueryMeta } -// CoordinateOp is used for encoding coordinate-related RPC requests. -type CoordinateOp string - -const ( - // CoordinateUpdate is used to update a node's coordinates in the catalog. - CoordinateUpdate CoordinateOp = "update" -) - // CoordinateUpdateRequest is used to update the network coordinate of a given // node. type CoordinateUpdateRequest struct { Datacenter string Node string - Op CoordinateOp Coord *coordinate.Coordinate WriteRequest }