Switches to a smooth rate limit vs. a bursty one.

This commit is contained in:
James Phillips 2016-08-09 11:29:12 -07:00
parent 5efd35c590
commit 11ad551204
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
2 changed files with 12 additions and 17 deletions

View File

@ -168,20 +168,8 @@ func (s *Server) fetchRemoteACLs(lastRemoteIndex uint64) (*structs.IndexedACLs,
// UpdateLocalACLs is given a list of changes to apply in order to bring the
// local ACLs in-line with the remote ACLs from the ACL datacenter.
func (s *Server) updateLocalACLs(changes structs.ACLRequests) error {
var ops int
start := time.Now()
minTimePerOp := time.Second / time.Duration(s.config.ACLReplicationApplyLimit)
for _, change := range changes {
// Do a very simple rate limit algorithm where we check every N
// operations and wait out to the second before we continue. If
// it's going slower than that, the sleep time will be negative
// so we will just keep going without delay.
if ops >= s.config.ACLReplicationApplyLimit {
elapsed := time.Now().Sub(start)
time.Sleep(1*time.Second - elapsed)
ops, start = 0, time.Now()
}
ops++
// Note that we are using the single ACL interface here and not
// performing all this inside a single transaction. This is OK
// for two reasons. First, there's nothing else other than this
@ -191,9 +179,16 @@ func (s *Server) updateLocalACLs(changes structs.ACLRequests) error {
// next replication pass will clean up and check everything
// again.
var reply string
start := time.Now()
if err := aclApplyInternal(s, change, &reply); err != nil {
return err
}
// Do a smooth rate limit to wait out the min time allowed for
// each op. If this op took longer than the min, then the sleep
// time will be negative and we will just move on.
elapsed := time.Now().Sub(start)
time.Sleep(minTimePerOp - elapsed)
}
return nil
}

View File

@ -240,12 +240,12 @@ func TestACLReplication_updateLocalACLs_RateLimit(t *testing.T) {
},
}
// Under the limit, should be quick.
// Should be throttled to 1 Hz.
start := time.Now()
if err := s1.updateLocalACLs(changes); err != nil {
t.Fatalf("err: %v", err)
}
if dur := time.Now().Sub(start); dur > 500*time.Millisecond {
if dur := time.Now().Sub(start); dur < time.Second {
t.Fatalf("too slow: %9.6f", dur.Seconds())
}
@ -258,12 +258,12 @@ func TestACLReplication_updateLocalACLs_RateLimit(t *testing.T) {
},
})
// Over the limit, should be throttled.
// Should be throttled to 1 Hz.
start = time.Now()
if err := s1.updateLocalACLs(changes); err != nil {
t.Fatalf("err: %v", err)
}
if dur := time.Now().Sub(start); dur < 500*time.Millisecond {
if dur := time.Now().Sub(start); dur < 2*time.Second {
t.Fatalf("too fast: %9.6f", dur.Seconds())
}
}