mirror of
https://github.com/status-im/consul.git
synced 2025-01-15 16:26:06 +00:00
1c2c975b0b
Prior to #13244, connect proxies and gateways could only be configured by an xDS session served by the local client agent. In an upcoming release, it will be possible to deploy a Consul service mesh without client agents. In this model, xDS sessions will be handled by the servers themselves, which necessitates load-balancing to prevent a single server from receiving a disproportionate amount of load and becoming overwhelmed. This introduces a simple form of load-balancing where Consul will attempt to achieve an even spread of load (xDS sessions) between all healthy servers. It does so by implementing a concurrent session limiter (limiter.SessionLimiter) and adjusting the limit according to autopilot state and proxy service registrations in the catalog. If a server is already over capacity (i.e. the session limit is lowered), Consul will begin draining sessions to rebalance the load. This will result in the client receiving a `RESOURCE_EXHAUSTED` status code. It is the client's responsibility to observe this response and reconnect to a different server. Users of the gRPC client connection brokered by the consul-server-connection-manager library will get this for free. The rate at which Consul will drain sessions to rebalance load is scaled dynamically based on the number of proxies in the catalog.
82 lines
1.8 KiB
Go
82 lines
1.8 KiB
Go
package limiter
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/hashicorp/consul/lib"
|
|
)
|
|
|
|
func init() { lib.SeedMathRand() }
|
|
|
|
func TestSessionLimiter(t *testing.T) {
|
|
lim := NewSessionLimiter()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
t.Cleanup(cancel)
|
|
go lim.Run(ctx)
|
|
|
|
// doneCh is used to shut the goroutines down at the end of the test.
|
|
doneCh := make(chan struct{})
|
|
t.Cleanup(func() { close(doneCh) })
|
|
|
|
// Start 10 sessions, and increment the counter when they are terminated.
|
|
var (
|
|
terminations uint32
|
|
wg sync.WaitGroup
|
|
)
|
|
for i := 0; i < 10; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
sess, err := lim.BeginSession()
|
|
require.NoError(t, err)
|
|
defer sess.End()
|
|
|
|
wg.Done()
|
|
|
|
select {
|
|
case <-sess.Terminated():
|
|
atomic.AddUint32(&terminations, 1)
|
|
case <-doneCh:
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Wait for all the sessions to begin.
|
|
wg.Wait()
|
|
|
|
// Lowering max sessions to 5 should result in 5 sessions being terminated.
|
|
lim.SetMaxSessions(5)
|
|
require.Eventually(t, func() bool {
|
|
return atomic.LoadUint32(&terminations) == 5
|
|
}, 2*time.Second, 50*time.Millisecond)
|
|
|
|
// Attempting to start a new session should fail immediately.
|
|
_, err := lim.BeginSession()
|
|
require.Equal(t, ErrCapacityReached, err)
|
|
|
|
// Raising MaxSessions should make room for a new session.
|
|
lim.SetMaxSessions(6)
|
|
sess, err := lim.BeginSession()
|
|
require.NoError(t, err)
|
|
|
|
// ...but trying to start another new one should fail
|
|
_, err = lim.BeginSession()
|
|
require.Equal(t, ErrCapacityReached, err)
|
|
|
|
// ...until another session ends.
|
|
sess.End()
|
|
_, err = lim.BeginSession()
|
|
require.NoError(t, err)
|
|
|
|
// Calling End twice is a no-op.
|
|
sess.End()
|
|
_, err = lim.BeginSession()
|
|
require.Equal(t, ErrCapacityReached, err)
|
|
}
|