Simplifies the batching function and adds some comments.

This commit is contained in:
James Phillips 2015-06-29 18:26:04 -07:00
parent e094f5a61d
commit 66a3d29743
3 changed files with 35 additions and 41 deletions

View File

@ -583,6 +583,9 @@ func (a *Agent) sendCoordinate() {
continue
}
// TODO - Consider adding a distance check so we don't send
// an update if the position hasn't changed by more than a
// threshold.
req := structs.CoordinateUpdateRequest{
Datacenter: a.config.Datacenter,
Node: a.config.NodeName,

View File

@ -279,8 +279,8 @@ func DefaultConfig() *Config {
// SyncCoordinateInterval defaults to 20 seconds, and scales up
// as the number of nodes in the cluster goes up. For 100k nodes,
// it will move up to 201 seconds, which gives an update rate of
// just under 500 updates per second. With this tuning we will
// apply less than 5 batches per period.
// just under 500 per second coming in from the clients. With this
// tuning we will be doing 1 transaction per second at this load.
CoordinateUpdatePeriod: 5 * time.Second,
CoordinateUpdateBatchSize: 512,
CoordinateUpdateMaxBatches: 5,

View File

@ -49,55 +49,46 @@ func (c *Coordinate) batchUpdate() {
// batchApplyUpdates applies all pending updates to the Raft log in a series of batches.
func (c *Coordinate) batchApplyUpdates() error {
// Grab the pending updates and release the lock so we can still handle
// incoming messages.
c.updatesLock.Lock()
defer c.updatesLock.Unlock()
pending := c.updates
c.updates = make(map[string]*coordinate.Coordinate)
c.updatesLock.Unlock()
// No matter what happens in here we should clear out any unprocessed
// updates
defer func() {
if len(c.updates) > 0 {
c.srv.logger.Printf("[ERR] Discarded %d coordinate updates; increase SyncCoordinateInterval", len(c.updates))
c.updates = make(map[string]*coordinate.Coordinate)
}
}()
batch := make([]*structs.Coordinate, 0, c.srv.config.CoordinateUpdateBatchSize)
flushBatch := func() error {
if len(batch) == 0 {
return nil
}
if _, err := c.srv.raftApply(structs.CoordinateBatchUpdateType, batch); err != nil {
return err
}
batch = batch[:0]
return nil
// Enforce the rate limit.
limit := c.srv.config.CoordinateUpdateBatchSize * c.srv.config.CoordinateUpdateMaxBatches
size := len(pending)
if size > limit {
c.srv.logger.Printf("[ERR] consul.coordinate: Discarded %d coordinate updates; increase SyncCoordinateInterval", size - limit)
size = limit
}
// Process up to the max configured number of updates.
remaining := c.srv.config.CoordinateUpdateBatchSize * c.srv.config.CoordinateUpdateMaxBatches
for node, coord := range(c.updates) {
if remaining <= 0 {
// Transform the map into a slice that we can feed to the Raft log in
// batches.
updates := make([]structs.Coordinate, size)
i := 0
for node, coord := range(pending) {
if !(i < size) {
break
}
batch = append(batch, &structs.Coordinate{node, coord})
delete(c.updates, node)
remaining--
updates[i] = structs.Coordinate{node, coord}
i++
}
if len(batch) == c.srv.config.CoordinateUpdateBatchSize {
if err := flushBatch(); err != nil {
return err
}
// Apply the updates to the Raft log in batches.
for start := 0; start < size; start += c.srv.config.CoordinateUpdateBatchSize {
end := start + c.srv.config.CoordinateUpdateBatchSize
if end > size {
end = size
}
slice := updates[start:end]
if _, err := c.srv.raftApply(structs.CoordinateBatchUpdateType, slice); err != nil {
return err
}
}
// Flush any leftovers from a partial batch.
if err := flushBatch(); err != nil {
return err
}
return nil
}