mirror of
https://github.com/status-im/consul.git
synced 2025-01-11 14:24:39 +00:00
246 lines
6.7 KiB
Go
246 lines
6.7 KiB
Go
|
// package limiter provides primatives for limiting the number of concurrent
|
||
|
// operations in-flight.
|
||
|
package limiter
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"math/rand"
|
||
|
"sort"
|
||
|
"sync"
|
||
|
"sync/atomic"
|
||
|
|
||
|
"golang.org/x/time/rate"
|
||
|
)
|
||
|
|
||
|
// Unlimited can be used to allow an unlimited number of concurrent sessions.
|
||
|
const Unlimited uint32 = 0
|
||
|
|
||
|
// ErrCapacityReached is returned when there is no capacity for additional sessions.
|
||
|
var ErrCapacityReached = errors.New("active session limit reached")
|
||
|
|
||
|
// SessionLimiter is a session-based concurrency limiter, it provides the basis
|
||
|
// of gRPC/xDS load balancing.
|
||
|
//
|
||
|
// Stream handlers obtain a session with BeginSession before they begin serving
|
||
|
// resources - if the server has reached capacity ErrCapacityReached is returned,
|
||
|
// otherwise a Session is returned.
|
||
|
//
|
||
|
// It is the session-holder's responsibility to:
|
||
|
//
|
||
|
// 1. Call End on the session when finished.
|
||
|
// 2. Receive on the session's Terminated channel and exit (e.g. close the gRPC
|
||
|
// stream) when it is closed.
|
||
|
//
|
||
|
// The maximum number of concurrent sessions is controlled with SetMaxSessions.
|
||
|
// If there are more than the given maximum sessions already in-flight,
|
||
|
// SessionLimiter will drain randomly-selected sessions at a rate controlled
|
||
|
// by SetDrainRateLimit.
|
||
|
type SessionLimiter struct {
|
||
|
drainLimiter *rate.Limiter
|
||
|
|
||
|
// max and inFlight are read/written using atomic operations.
|
||
|
max, inFlight uint32
|
||
|
|
||
|
// wakeCh is used to trigger the Run loop to start draining excess sessions.
|
||
|
wakeCh chan struct{}
|
||
|
|
||
|
// Everything below here is guarded by mu.
|
||
|
mu sync.Mutex
|
||
|
maxSessionID uint64
|
||
|
sessionIDs []uint64 // sessionIDs must be sorted so we can binary search it.
|
||
|
sessions map[uint64]*session
|
||
|
}
|
||
|
|
||
|
// NewSessionLimiter creates a new SessionLimiter.
|
||
|
func NewSessionLimiter() *SessionLimiter {
|
||
|
return &SessionLimiter{
|
||
|
drainLimiter: rate.NewLimiter(rate.Inf, 1),
|
||
|
max: Unlimited,
|
||
|
wakeCh: make(chan struct{}, 1),
|
||
|
sessionIDs: make([]uint64, 0),
|
||
|
sessions: make(map[uint64]*session),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Run the SessionLimiter's drain loop, which terminates excess sessions if the
|
||
|
// limit is lowered. It will exit when the given context is canceled or reaches
|
||
|
// its deadline.
|
||
|
func (l *SessionLimiter) Run(ctx context.Context) {
|
||
|
for {
|
||
|
select {
|
||
|
case <-l.wakeCh:
|
||
|
for {
|
||
|
if !l.overCapacity() {
|
||
|
break
|
||
|
}
|
||
|
|
||
|
if err := l.drainLimiter.Wait(ctx); err != nil {
|
||
|
break
|
||
|
}
|
||
|
|
||
|
if !l.overCapacity() {
|
||
|
break
|
||
|
}
|
||
|
|
||
|
l.terminateSession()
|
||
|
}
|
||
|
case <-ctx.Done():
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// SetMaxSessions controls the maximum number of concurrent sessions. If it is
|
||
|
// lower, randomly-selected sessions will be drained.
|
||
|
func (l *SessionLimiter) SetMaxSessions(max uint32) {
|
||
|
atomic.StoreUint32(&l.max, max)
|
||
|
|
||
|
// Send on wakeCh without blocking if the Run loop is busy. wakeCh has a
|
||
|
// buffer of 1, so no triggers will be missed.
|
||
|
select {
|
||
|
case l.wakeCh <- struct{}{}:
|
||
|
default:
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// SetDrainRateLimit controls the rate at which excess sessions will be drained.
|
||
|
func (l *SessionLimiter) SetDrainRateLimit(limit rate.Limit) {
|
||
|
l.drainLimiter.SetLimit(limit)
|
||
|
}
|
||
|
|
||
|
// BeginSession begins a new session, or returns ErrCapacityReached if the
|
||
|
// concurrent session limit has been reached.
|
||
|
//
|
||
|
// It is the session-holder's responsibility to:
|
||
|
//
|
||
|
// 1. Call End on the session when finished.
|
||
|
// 2. Receive on the session's Terminated channel and exit (e.g. close the gRPC
|
||
|
// stream) when it is closed.
|
||
|
func (l *SessionLimiter) BeginSession() (Session, error) {
|
||
|
if !l.hasCapacity() {
|
||
|
return nil, ErrCapacityReached
|
||
|
}
|
||
|
|
||
|
l.mu.Lock()
|
||
|
defer l.mu.Unlock()
|
||
|
return l.createSessionLocked(), nil
|
||
|
}
|
||
|
|
||
|
// Note: hasCapacity is *best effort*. As we do not hold l.mu it's possible that:
|
||
|
//
|
||
|
// - max has changed by the time we compare it to inFlight.
|
||
|
// - inFlight < max now, but increases before we create a new session.
|
||
|
//
|
||
|
// This is acceptable for our uses, especially because excess sessions will
|
||
|
// eventually be drained.
|
||
|
func (l *SessionLimiter) hasCapacity() bool {
|
||
|
max := atomic.LoadUint32(&l.max)
|
||
|
if max == Unlimited {
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
cur := atomic.LoadUint32(&l.inFlight)
|
||
|
return max > cur
|
||
|
}
|
||
|
|
||
|
// Note: overCapacity is *best effort*. As we do not hold l.mu it's possible that:
|
||
|
//
|
||
|
// - max has changed by the time we compare it to inFlight.
|
||
|
// - inFlight > max now, but decreases before we terminate a session.
|
||
|
func (l *SessionLimiter) overCapacity() bool {
|
||
|
max := atomic.LoadUint32(&l.max)
|
||
|
if max == Unlimited {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
cur := atomic.LoadUint32(&l.inFlight)
|
||
|
return cur > max
|
||
|
}
|
||
|
|
||
|
func (l *SessionLimiter) terminateSession() {
|
||
|
l.mu.Lock()
|
||
|
defer l.mu.Unlock()
|
||
|
|
||
|
idx := rand.Intn(len(l.sessionIDs))
|
||
|
id := l.sessionIDs[idx]
|
||
|
l.sessions[id].terminate()
|
||
|
l.deleteSessionLocked(idx, id)
|
||
|
}
|
||
|
|
||
|
func (l *SessionLimiter) createSessionLocked() *session {
|
||
|
session := &session{
|
||
|
l: l,
|
||
|
id: l.maxSessionID,
|
||
|
termCh: make(chan struct{}),
|
||
|
}
|
||
|
|
||
|
l.maxSessionID++
|
||
|
l.sessionIDs = append(l.sessionIDs, session.id)
|
||
|
l.sessions[session.id] = session
|
||
|
|
||
|
atomic.AddUint32(&l.inFlight, 1)
|
||
|
|
||
|
return session
|
||
|
}
|
||
|
|
||
|
func (l *SessionLimiter) deleteSessionLocked(idx int, id uint64) {
|
||
|
delete(l.sessions, id)
|
||
|
|
||
|
// Note: it's important that we preserve the order here (which most allocation
|
||
|
// free deletion tricks don't) because we binary search the slice.
|
||
|
l.sessionIDs = append(l.sessionIDs[:idx], l.sessionIDs[idx+1:]...)
|
||
|
|
||
|
atomic.AddUint32(&l.inFlight, ^uint32(0))
|
||
|
}
|
||
|
|
||
|
func (l *SessionLimiter) deleteSessionWithID(id uint64) {
|
||
|
l.mu.Lock()
|
||
|
defer l.mu.Unlock()
|
||
|
|
||
|
idx := sort.Search(len(l.sessionIDs), func(i int) bool {
|
||
|
return l.sessionIDs[i] >= id
|
||
|
})
|
||
|
|
||
|
if idx == len(l.sessionIDs) || l.sessionIDs[idx] != id {
|
||
|
// It's possible that we weren't able to find the id because the session has
|
||
|
// already been deleted. This could be because the session-holder called End
|
||
|
// more than once, or because the session was drained. In either case there's
|
||
|
// nothing more to do.
|
||
|
return
|
||
|
}
|
||
|
|
||
|
l.deleteSessionLocked(idx, id)
|
||
|
}
|
||
|
|
||
|
// Session allows its holder to perform an operation (e.g. serve a gRPC stream)
|
||
|
// concurrenly with other session-holders. Sessions may be terminated abruptly
|
||
|
// by the SessionLimiter, so it is the responsibility of the holder to receive
|
||
|
// on the Terminated channel and halt the operation when it is closed.
|
||
|
type Session interface {
|
||
|
// End the session.
|
||
|
//
|
||
|
// This MUST be called when the session-holder is done (e.g. the gRPC stream
|
||
|
// is closed).
|
||
|
End()
|
||
|
|
||
|
// Terminated is a channel that is closed when the session is terminated.
|
||
|
//
|
||
|
// The session-holder MUST receive on it and exit (e.g. close the gRPC stream)
|
||
|
// when it is closed.
|
||
|
Terminated() <-chan struct{}
|
||
|
}
|
||
|
|
||
|
type session struct {
|
||
|
l *SessionLimiter
|
||
|
|
||
|
id uint64
|
||
|
termCh chan struct{}
|
||
|
}
|
||
|
|
||
|
func (s *session) End() { s.l.deleteSessionWithID(s.id) }
|
||
|
|
||
|
func (s *session) Terminated() <-chan struct{} { return s.termCh }
|
||
|
|
||
|
func (s *session) terminate() { close(s.termCh) }
|