From 11ad551204df2a14896f9b95c76d65ace9668179 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 9 Aug 2016 11:29:12 -0700 Subject: [PATCH] Switches to a smooth rate limit vs. a bursty one. --- consul/acl_replication.go | 21 ++++++++------------- consul/acl_replication_test.go | 8 ++++---- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/consul/acl_replication.go b/consul/acl_replication.go index 2a74d9b182..573b562513 100644 --- a/consul/acl_replication.go +++ b/consul/acl_replication.go @@ -168,20 +168,8 @@ func (s *Server) fetchRemoteACLs(lastRemoteIndex uint64) (*structs.IndexedACLs, // UpdateLocalACLs is given a list of changes to apply in order to bring the // local ACLs in-line with the remote ACLs from the ACL datacenter. func (s *Server) updateLocalACLs(changes structs.ACLRequests) error { - var ops int - start := time.Now() + minTimePerOp := time.Second / time.Duration(s.config.ACLReplicationApplyLimit) for _, change := range changes { - // Do a very simple rate limit algorithm where we check every N - // operations and wait out to the second before we continue. If - // it's going slower than that, the sleep time will be negative - // so we will just keep going without delay. - if ops >= s.config.ACLReplicationApplyLimit { - elapsed := time.Now().Sub(start) - time.Sleep(1*time.Second - elapsed) - ops, start = 0, time.Now() - } - ops++ - // Note that we are using the single ACL interface here and not // performing all this inside a single transaction. This is OK // for two reasons. First, there's nothing else other than this @@ -191,9 +179,16 @@ func (s *Server) updateLocalACLs(changes structs.ACLRequests) error { // next replication pass will clean up and check everything // again. var reply string + start := time.Now() if err := aclApplyInternal(s, change, &reply); err != nil { return err } + + // Do a smooth rate limit to wait out the min time allowed for + // each op. If this op took longer than the min, then the sleep + // time will be negative and we will just move on. + elapsed := time.Now().Sub(start) + time.Sleep(minTimePerOp - elapsed) } return nil } diff --git a/consul/acl_replication_test.go b/consul/acl_replication_test.go index 6bb0bb9602..f7f10752b8 100644 --- a/consul/acl_replication_test.go +++ b/consul/acl_replication_test.go @@ -240,12 +240,12 @@ func TestACLReplication_updateLocalACLs_RateLimit(t *testing.T) { }, } - // Under the limit, should be quick. + // Should be throttled to 1 Hz. start := time.Now() if err := s1.updateLocalACLs(changes); err != nil { t.Fatalf("err: %v", err) } - if dur := time.Now().Sub(start); dur > 500*time.Millisecond { + if dur := time.Now().Sub(start); dur < time.Second { t.Fatalf("too slow: %9.6f", dur.Seconds()) } @@ -258,12 +258,12 @@ func TestACLReplication_updateLocalACLs_RateLimit(t *testing.T) { }, }) - // Over the limit, should be throttled. + // Should be throttled to 1 Hz. start = time.Now() if err := s1.updateLocalACLs(changes); err != nil { t.Fatalf("err: %v", err) } - if dur := time.Now().Sub(start); dur < 500*time.Millisecond { + if dur := time.Now().Sub(start); dur < 2*time.Second { t.Fatalf("too fast: %9.6f", dur.Seconds()) } }