Moves batching down into the state store and changes it to fail-fast.

* A batch of updates is done all in a single transaction.
* We no longer need to get an update to kick things, there's a periodic flush.
* If incoming updates overwhelm the configured flush rate they will be dumped with an error.
This commit is contained in:
James Phillips 2015-06-22 19:14:02 -07:00
parent b9d5fb0f90
commit d12aa2ffab
10 changed files with 197 additions and 144 deletions

View File

@ -580,7 +580,6 @@ func (a *Agent) sendCoordinate() {
req := structs.CoordinateUpdateRequest{
Datacenter: a.config.Datacenter,
Node: a.config.NodeName,
Op: structs.CoordinateUpdate,
Coord: c,
WriteRequest: structs.WriteRequest{Token: a.config.ACLToken},
}

View File

@ -376,9 +376,10 @@ type Config struct {
DisableCoordinates bool `mapstructure:"disable_coordinates" json:"-"`
// SyncCoordinateInterval controls the interval for sending network
// coordinates to the server. Defaults to every 15s, but scales up as
// coordinates to the server. Defaults to every 20s, but scales up as
// the number of nodes increases in the network, to prevent servers from
// being overwhelmed.
// being overwhelmed. If you update this, you may need to adjust the
// tuning of CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize.
SyncCoordinateInterval time.Duration `mapstructure:"-" json:"-"`
// Checks holds the provided check definitions
@ -476,7 +477,7 @@ func DefaultConfig() *Config {
CheckUpdateInterval: 5 * time.Minute,
AEInterval: time.Minute,
DisableCoordinates: false,
SyncCoordinateInterval: 15 * time.Second,
SyncCoordinateInterval: 20 * time.Second,
ACLTTL: 30 * time.Second,
ACLDownPolicy: "extend-cache",
ACLDefaultPolicy: "allow",

View File

@ -9,6 +9,7 @@ import (
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/serf/coordinate"
)
func TestAgentAntiEntropy_Services(t *testing.T) {
@ -807,7 +808,8 @@ func TestAgent_nestedPauseResume(t *testing.T) {
func TestAgent_sendCoordinate(t *testing.T) {
conf := nextConfig()
conf.SyncCoordinateInterval = 10 * time.Millisecond
conf.ConsulConfig.CoordinateUpdatePeriod = 0 * time.Millisecond
conf.ConsulConfig.CoordinateUpdatePeriod = 100 * time.Millisecond
conf.ConsulConfig.CoordinateUpdateMaxBatchSize = 20
dir, agent := makeAgent(t, conf)
defer os.RemoveAll(dir)
defer agent.Shutdown()
@ -815,7 +817,7 @@ func TestAgent_sendCoordinate(t *testing.T) {
testutil.WaitForLeader(t, agent.RPC, "dc1")
// Wait a little while for an update.
time.Sleep(3 * conf.SyncCoordinateInterval)
time.Sleep(2 * conf.ConsulConfig.CoordinateUpdatePeriod)
// Make sure the coordinate is present.
req := structs.NodeSpecificRequest{
@ -829,4 +831,45 @@ func TestAgent_sendCoordinate(t *testing.T) {
if reply.Coord == nil {
t.Fatalf("should get a coordinate")
}
// Start spamming for a little while to get rate limit errors back from
// the server.
conf.SyncCoordinateInterval = 1 * time.Millisecond
time.Sleep(2 * conf.ConsulConfig.CoordinateUpdatePeriod)
// Slow down and let the server catch up.
conf.SyncCoordinateInterval = 10 * time.Millisecond
time.Sleep(2 * conf.ConsulConfig.CoordinateUpdatePeriod)
// Inject a random coordinate so we can confirm that the periodic process
// is still able to update it.
zeroCoord := &coordinate.Coordinate{}
func() {
req := structs.CoordinateUpdateRequest{
Datacenter: agent.config.Datacenter,
Node: agent.config.NodeName,
Coord: zeroCoord,
WriteRequest: structs.WriteRequest{Token: agent.config.ACLToken},
}
var reply struct{}
if err := agent.RPC("Coordinate.Update", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
}()
// Wait a little while for the injected update, as well as periodic ones
// to fire.
time.Sleep(2 * conf.ConsulConfig.CoordinateUpdatePeriod)
// Make sure the injected coordinate is not the one that's present.
req = structs.NodeSpecificRequest{
Datacenter: agent.config.Datacenter,
Node: agent.config.NodeName,
}
if err := agent.RPC("Coordinate.Get", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if reflect.DeepEqual(zeroCoord, reply.Coord) {
t.Fatalf("should not have gotten the zero coordinate")
}
}

View File

@ -18,14 +18,17 @@ import (
)
const (
// This scale factor means we will add a minute after we
// cross 128 nodes, another at 256, another at 512, etc.
// By 8192 nodes, we will scale up by a factor of 8
// This scale factor means we will add a minute after we cross 128 nodes,
// another at 256, another at 512, etc. By 8192 nodes, we will scale up
// by a factor of 8.
//
// If you update this, you may need to adjust the tuning of
// CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize.
aeScaleThreshold = 128
)
// aeScale is used to scale the time interval at which anti-entropy
// take place. It is used to prevent saturation as the cluster size grows
// aeScale is used to scale the time interval at which anti-entropy and coordinate
// updates take place. It is used to prevent saturation as the cluster size grows.
func aeScale(interval time.Duration, n int) time.Duration {
// Don't scale until we cross the threshold
if n <= aeScaleThreshold {

View File

@ -270,8 +270,14 @@ func DefaultConfig() *Config {
TombstoneTTLGranularity: 30 * time.Second,
SessionTTLMin: 10 * time.Second,
DisableCoordinates: false,
CoordinateUpdatePeriod: 30 * time.Second,
CoordinateUpdateMaxBatchSize: 1000,
// SyncCoordinateInterval defaults to 20 seconds, and scales up
// as the number of nodes in the cluster goes up. For 100k nodes,
// it will move up to 201 seconds, which gives an update rate of
// just under 500 updates per second. We will split this into 2
// batches.
CoordinateUpdatePeriod: 500 * time.Millisecond,
CoordinateUpdateMaxBatchSize: 250,
}
// Increase our reap interval to 3 days instead of 24h.

View File

@ -1,7 +1,7 @@
package consul
import (
"sync"
"fmt"
"time"
"github.com/hashicorp/consul/consul/structs"
@ -12,56 +12,69 @@ type Coordinate struct {
// srv is a pointer back to the server.
srv *Server
// updateLastSent is the last time we flushed pending coordinate updates
// to the Raft log. CoordinateUpdatePeriod is used to control how long we
// wait before doing an update (that time, or hitting more than the
// configured CoordinateUpdateMaxBatchSize, whichever comes first).
updateLastSent time.Time
// updateBuffer holds the pending coordinate updates, waiting to be
// flushed to the Raft log.
updateBuffer []*structs.CoordinateUpdateRequest
// updateBufferLock manages concurrent access to updateBuffer.
updateBufferLock sync.Mutex
// updateCh receives coordinate updates and applies them to the raft log
// in batches so that we don't create tons of tiny transactions.
updateCh chan *structs.Coordinate
}
// NewCoordinate returns a new Coordinate endpoint.
func NewCoordinate(srv *Server) *Coordinate {
return &Coordinate{
len := srv.config.CoordinateUpdateMaxBatchSize
c := &Coordinate{
srv: srv,
updateLastSent: time.Now(),
updateCh: make(chan *structs.Coordinate, len),
}
// This will flush all pending updates at a fixed period.
go func() {
for {
select {
case <-time.After(srv.config.CoordinateUpdatePeriod):
c.batchApplyUpdates()
case <-srv.shutdownCh:
return
}
}
}()
return c
}
// batchApplyUpdates is a non-blocking routine that applies all pending updates
// to the Raft log.
func (c *Coordinate) batchApplyUpdates() {
var updates []*structs.Coordinate
for done := false; !done; {
select {
case update := <-c.updateCh:
updates = append(updates, update)
default:
done = true
}
}
// Update handles requests to update the LAN coordinate of a node.
if len(updates) > 0 {
_, err := c.srv.raftApply(structs.CoordinateBatchUpdateType, updates)
if err != nil {
c.srv.logger.Printf("[ERR] consul.coordinate: Batch update failed: %v", err)
}
}
}
// Update inserts or updates the LAN coordinate of a node.
func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) error {
if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done {
return err
}
c.updateBufferLock.Lock()
defer c.updateBufferLock.Unlock()
c.updateBuffer = append(c.updateBuffer, args)
// Process updates in batches to avoid tons of small transactions against
// the Raft log.
shouldFlush := time.Since(c.updateLastSent) > c.srv.config.CoordinateUpdatePeriod ||
len(c.updateBuffer) > c.srv.config.CoordinateUpdateMaxBatchSize
if shouldFlush {
// This transaction could take a while so we don't block here.
buf := c.updateBuffer
go func() {
_, err := c.srv.raftApply(structs.CoordinateRequestType, buf)
if err != nil {
c.srv.logger.Printf("[ERR] consul.coordinate: Update failed: %v", err)
}
}()
// We clear the buffer regardless of whether the raft transaction
// succeeded, just so the buffer doesn't keep growing without bound.
c.updateLastSent = time.Now()
c.updateBuffer = nil
// Perform a non-blocking write to the channel. We'd rather spill updates
// than gum things up blocking here.
update := &structs.Coordinate{Node: args.Node, Coord: args.Coord}
select {
case c.updateCh <- update:
// This is a noop - we are done if the write went through.
default:
return fmt.Errorf("Coordinate update rate limit exceeded, increase SyncCoordinateInterval")
}
return nil

View File

@ -5,6 +5,7 @@ import (
"math/rand"
"os"
"reflect"
"strings"
"testing"
"time"
@ -29,7 +30,8 @@ func generateRandomCoordinate() *coordinate.Coordinate {
}
// verifyCoordinatesEqual will compare a and b and fail if they are not exactly
// equal (no floating point fuzz is considered).
// equal (no floating point fuzz is considered since we are trying to make sure
// we are getting exactly the coordinates we expect, without math on them).
func verifyCoordinatesEqual(t *testing.T, a, b *coordinate.Coordinate) {
if !reflect.DeepEqual(a, b) {
t.Fatalf("coordinates are not equal: %v != %v", a, b)
@ -41,7 +43,7 @@ func TestCoordinate_Update(t *testing.T) {
dir1, config1 := testServerConfig(t, name)
defer os.RemoveAll(dir1)
config1.CoordinateUpdatePeriod = 1 * time.Second
config1.CoordinateUpdatePeriod = 500 * time.Millisecond
config1.CoordinateUpdateMaxBatchSize = 5
s1, err := NewServer(config1)
if err != nil {
@ -53,28 +55,29 @@ func TestCoordinate_Update(t *testing.T) {
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
// Send an update for the first node.
arg1 := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "node1",
Op: structs.CoordinateUpdate,
Coord: generateRandomCoordinate(),
}
arg2 := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "node2",
Op: structs.CoordinateUpdate,
Coord: generateRandomCoordinate(),
}
// Send an update for the first node.
var out struct{}
if err := client.Call("Coordinate.Update", &arg1, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Make sure the update did not yet apply because the batching thresholds
// haven't yet been met.
// Send an update for the second node.
arg2 := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "node2",
Coord: generateRandomCoordinate(),
}
if err := client.Call("Coordinate.Update", &arg2, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Make sure the updates did not yet apply because the update period
// hasn't expired.
state := s1.fsm.State()
_, d, err := state.CoordinateGet("node1")
if err != nil {
@ -83,18 +86,16 @@ func TestCoordinate_Update(t *testing.T) {
if d != nil {
t.Fatalf("should be nil because the update should be batched")
}
// Wait a while and send another update. This time both updates should
// be applied.
time.Sleep(2 * s1.config.CoordinateUpdatePeriod)
if err := client.Call("Coordinate.Update", &arg2, &out); err != nil {
_, d, err = state.CoordinateGet("node2")
if err != nil {
t.Fatalf("err: %v", err)
}
if d != nil {
t.Fatalf("should be nil because the update should be batched")
}
// Wait a little while so the flush goroutine can run, then make sure
// both coordinates made it in.
time.Sleep(100 * time.Millisecond)
// Wait a while and the updates should get picked up.
time.Sleep(2 * s1.config.CoordinateUpdatePeriod)
_, d, err = state.CoordinateGet("node1")
if err != nil {
t.Fatalf("err: %v", err)
@ -103,7 +104,6 @@ func TestCoordinate_Update(t *testing.T) {
t.Fatalf("should return a coordinate but it's nil")
}
verifyCoordinatesEqual(t, d.Coord, arg1.Coord)
_, d, err = state.CoordinateGet("node2")
if err != nil {
t.Fatalf("err: %v", err)
@ -113,19 +113,25 @@ func TestCoordinate_Update(t *testing.T) {
}
verifyCoordinatesEqual(t, d.Coord, arg2.Coord)
// Now try spamming coordinates and make sure it flushes when the batch
// size is hit.
for i := 0; i < (s1.config.CoordinateUpdateMaxBatchSize + 1); i++ {
// Now try spamming coordinates and make sure it starts dropping when
// the pipe is full.
for i := 0; i < s1.config.CoordinateUpdateMaxBatchSize; i++ {
arg1.Coord = generateRandomCoordinate()
if err := client.Call("Coordinate.Update", &arg1, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
// Wait a little while so the flush goroutine can run, then make sure
// the last coordinate update made it in.
time.Sleep(100 * time.Millisecond)
// This one should get dropped.
arg2.Coord = generateRandomCoordinate()
err = client.Call("Coordinate.Update", &arg2, &out)
if err == nil || !strings.Contains(err.Error(), "rate limit") {
t.Fatalf("should have failed with a rate limit error, got %v", err)
}
// Wait a little while for the batch routine to run, then make sure
// all but the last coordinate update made it in.
time.Sleep(2 * s1.config.CoordinateUpdatePeriod)
_, d, err = state.CoordinateGet("node1")
if err != nil {
t.Fatalf("err: %v", err)
@ -148,11 +154,10 @@ func TestCoordinate_Get(t *testing.T) {
arg := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "node1",
Op: structs.CoordinateUpdate,
Coord: generateRandomCoordinate(),
}
// Send an initial update, waiting a little while for the flush goroutine
// Send an initial update, waiting a little while for the batch update
// to run.
var out struct{}
if err := client.Call("Coordinate.Update", &arg, &out); err != nil {

View File

@ -89,8 +89,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
return c.applyACLOperation(buf[1:], log.Index)
case structs.TombstoneRequestType:
return c.applyTombstoneOperation(buf[1:], log.Index)
case structs.CoordinateRequestType:
return c.applyCoordinateOperation(buf[1:], log.Index)
case structs.CoordinateBatchUpdateType:
return c.applyCoordinateBatchUpdate(buf[1:], log.Index)
default:
if ignoreUnknown {
c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
@ -248,24 +248,19 @@ func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{
}
}
func (c *consulFSM) applyCoordinateOperation(buf []byte, index uint64) interface{} {
var reqs []*structs.CoordinateUpdateRequest
if err := structs.Decode(buf, &reqs); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
// applyCoordinateBatchUpdate processes a batch of coordinate updates and applies
// them in a single underlying transaction. This interface isn't 1:1 with the outer
// update interface that the coordinate endpoint exposes, so we made it single
// purpose and avoided the opcode convention.
func (c *consulFSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interface{} {
var updates []*structs.Coordinate
if err := structs.Decode(buf, &updates); err != nil {
panic(fmt.Errorf("failed to decode batch updates: %v", err))
}
for _, req := range reqs {
defer metrics.MeasureSince([]string{"consul", "fsm", "coordinate", string(req.Op)}, time.Now())
switch req.Op {
case structs.CoordinateUpdate:
coord := &structs.Coordinate{Node: req.Node, Coord: req.Coord}
if err := c.state.CoordinateUpdate(index, coord); err != nil {
defer metrics.MeasureSince([]string{"consul", "fsm", "coordinate", "batch-update"}, time.Now())
if err := c.state.CoordinateBatchUpdate(index, updates); err != nil {
return err
}
default:
c.logger.Printf("[WARN] consul.fsm: Invalid Coordinate operation '%s'", req.Op)
return fmt.Errorf("Invalid Coordinate operation '%s'", req.Op)
}
}
return nil
}

View File

@ -741,20 +741,17 @@ func TestFSM_CoordinateUpdate(t *testing.T) {
defer fsm.Close()
// Write a batch of two coordinates.
reqs := make([]*structs.CoordinateUpdateRequest, 2)
reqs[0] = &structs.CoordinateUpdateRequest{
Datacenter: "dc1",
updates := []*structs.Coordinate{
&structs.Coordinate{
Node: "node1",
Op: structs.CoordinateUpdate,
Coord: generateRandomCoordinate(),
}
reqs[1] = &structs.CoordinateUpdateRequest{
Datacenter: "dc1",
},
&structs.Coordinate{
Node: "node2",
Op: structs.CoordinateUpdate,
Coord: generateRandomCoordinate(),
},
}
buf, err := structs.Encode(structs.CoordinateRequestType, reqs)
buf, err := structs.Encode(structs.CoordinateBatchUpdateType, updates)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -764,23 +761,23 @@ func TestFSM_CoordinateUpdate(t *testing.T) {
}
// Read back the two coordinates to make sure they got updated.
_, d, err := fsm.state.CoordinateGet(reqs[0].Node)
_, d, err := fsm.state.CoordinateGet(updates[0].Node)
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("missing")
}
verifyCoordinatesEqual(t, reqs[0].Coord, d.Coord)
verifyCoordinatesEqual(t, updates[0].Coord, d.Coord)
_, d, err = fsm.state.CoordinateGet(reqs[1].Node)
_, d, err = fsm.state.CoordinateGet(updates[1].Node)
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("missing")
}
verifyCoordinatesEqual(t, reqs[1].Coord, d.Coord)
verifyCoordinatesEqual(t, updates[1].Coord, d.Coord)
}
func TestFSM_SessionCreate_Destroy(t *testing.T) {

View File

@ -32,7 +32,7 @@ const (
SessionRequestType
ACLRequestType
TombstoneRequestType
CoordinateRequestType
CoordinateBatchUpdateType
)
const (
@ -633,20 +633,11 @@ type IndexedCoordinate struct {
QueryMeta
}
// CoordinateOp is used for encoding coordinate-related RPC requests.
type CoordinateOp string
const (
// CoordinateUpdate is used to update a node's coordinates in the catalog.
CoordinateUpdate CoordinateOp = "update"
)
// CoordinateUpdateRequest is used to update the network coordinate of a given
// node.
type CoordinateUpdateRequest struct {
Datacenter string
Node string
Op CoordinateOp
Coord *coordinate.Coordinate
WriteRequest
}