253 lines
7.0 KiB
Go
Raw Normal View History

// Copyright (c) HashiCorp, Inc.
[COMPLIANCE] License changes (#18443) * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
2023-08-11 09:12:13 -04:00
// SPDX-License-Identifier: BUSL-1.1
// package limiter provides primatives for limiting the number of concurrent
// operations in-flight.
package limiter
import (
"context"
"errors"
"math/rand"
"sort"
"sync"
"sync/atomic"
"golang.org/x/time/rate"
)
// Unlimited can be used to allow an unlimited number of concurrent sessions.
const Unlimited uint32 = 0
// ErrCapacityReached is returned when there is no capacity for additional sessions.
var ErrCapacityReached = errors.New("active session limit reached")
// SessionLimiter is a session-based concurrency limiter, it provides the basis
// of gRPC/xDS load balancing.
//
// Stream handlers obtain a session with BeginSession before they begin serving
// resources - if the server has reached capacity ErrCapacityReached is returned,
// otherwise a Session is returned.
//
// It is the session-holder's responsibility to:
//
// 1. Call End on the session when finished.
// 2. Receive on the session's Terminated channel and exit (e.g. close the gRPC
// stream) when it is closed.
//
// The maximum number of concurrent sessions is controlled with SetMaxSessions.
// If there are more than the given maximum sessions already in-flight,
// SessionLimiter will drain randomly-selected sessions at a rate controlled
// by SetDrainRateLimit.
type SessionLimiter struct {
drainLimiter *rate.Limiter
// max and inFlight are read/written using atomic operations.
max, inFlight uint32
// wakeCh is used to trigger the Run loop to start draining excess sessions.
wakeCh chan struct{}
// Everything below here is guarded by mu.
mu sync.Mutex
maxSessionID uint64
sessionIDs []uint64 // sessionIDs must be sorted so we can binary search it.
sessions map[uint64]*session
}
// NewSessionLimiter creates a new SessionLimiter.
func NewSessionLimiter() *SessionLimiter {
return &SessionLimiter{
drainLimiter: rate.NewLimiter(rate.Inf, 1),
max: Unlimited,
wakeCh: make(chan struct{}, 1),
sessionIDs: make([]uint64, 0),
sessions: make(map[uint64]*session),
}
}
// Run the SessionLimiter's drain loop, which terminates excess sessions if the
// limit is lowered. It will exit when the given context is canceled or reaches
// its deadline.
func (l *SessionLimiter) Run(ctx context.Context) {
for {
select {
case <-l.wakeCh:
for {
if !l.overCapacity() {
break
}
if err := l.drainLimiter.Wait(ctx); err != nil {
break
}
if !l.overCapacity() {
break
}
l.terminateSession()
}
case <-ctx.Done():
return
}
}
}
// SetMaxSessions controls the maximum number of concurrent sessions. If it is
// lower, randomly-selected sessions will be drained.
func (l *SessionLimiter) SetMaxSessions(max uint32) {
atomic.StoreUint32(&l.max, max)
// Send on wakeCh without blocking if the Run loop is busy. wakeCh has a
// buffer of 1, so no triggers will be missed.
select {
case l.wakeCh <- struct{}{}:
default:
}
}
// SetDrainRateLimit controls the rate at which excess sessions will be drained.
func (l *SessionLimiter) SetDrainRateLimit(limit rate.Limit) {
l.drainLimiter.SetLimit(limit)
}
// BeginSession begins a new session, or returns ErrCapacityReached if the
// concurrent session limit has been reached.
//
// It is the session-holder's responsibility to:
//
// 1. Call End on the session when finished.
// 2. Receive on the session's Terminated channel and exit (e.g. close the gRPC
// stream) when it is closed.
func (l *SessionLimiter) BeginSession() (Session, error) {
if !l.hasCapacity() {
return nil, ErrCapacityReached
}
l.mu.Lock()
defer l.mu.Unlock()
return l.createSessionLocked(), nil
}
// Note: hasCapacity is *best effort*. As we do not hold l.mu it's possible that:
//
// - max has changed by the time we compare it to inFlight.
// - inFlight < max now, but increases before we create a new session.
//
// This is acceptable for our uses, especially because excess sessions will
// eventually be drained.
func (l *SessionLimiter) hasCapacity() bool {
max := atomic.LoadUint32(&l.max)
if max == Unlimited {
return true
}
cur := atomic.LoadUint32(&l.inFlight)
return max > cur
}
// Note: overCapacity is *best effort*. As we do not hold l.mu it's possible that:
//
// - max has changed by the time we compare it to inFlight.
// - inFlight > max now, but decreases before we terminate a session.
func (l *SessionLimiter) overCapacity() bool {
max := atomic.LoadUint32(&l.max)
if max == Unlimited {
return false
}
cur := atomic.LoadUint32(&l.inFlight)
return cur > max
}
func (l *SessionLimiter) terminateSession() {
l.mu.Lock()
defer l.mu.Unlock()
idx := rand.Intn(len(l.sessionIDs))
id := l.sessionIDs[idx]
l.sessions[id].terminate()
l.deleteSessionLocked(idx, id)
}
func (l *SessionLimiter) createSessionLocked() *session {
session := &session{
l: l,
id: l.maxSessionID,
termCh: make(chan struct{}),
}
l.maxSessionID++
l.sessionIDs = append(l.sessionIDs, session.id)
l.sessions[session.id] = session
atomic.AddUint32(&l.inFlight, 1)
return session
}
func (l *SessionLimiter) deleteSessionLocked(idx int, id uint64) {
delete(l.sessions, id)
// Note: it's important that we preserve the order here (which most allocation
// free deletion tricks don't) because we binary search the slice.
l.sessionIDs = append(l.sessionIDs[:idx], l.sessionIDs[idx+1:]...)
atomic.AddUint32(&l.inFlight, ^uint32(0))
}
func (l *SessionLimiter) deleteSessionWithID(id uint64) {
l.mu.Lock()
defer l.mu.Unlock()
idx := sort.Search(len(l.sessionIDs), func(i int) bool {
return l.sessionIDs[i] >= id
})
if idx == len(l.sessionIDs) || l.sessionIDs[idx] != id {
// It's possible that we weren't able to find the id because the session has
// already been deleted. This could be because the session-holder called End
// more than once, or because the session was drained. In either case there's
// nothing more to do.
return
}
l.deleteSessionLocked(idx, id)
}
// SessionTerminatedChan is a channel that will be closed to notify session-
// holders that a session has been terminated.
type SessionTerminatedChan <-chan struct{}
// Session allows its holder to perform an operation (e.g. serve a gRPC stream)
// concurrenly with other session-holders. Sessions may be terminated abruptly
// by the SessionLimiter, so it is the responsibility of the holder to receive
// on the Terminated channel and halt the operation when it is closed.
type Session interface {
// End the session.
//
// This MUST be called when the session-holder is done (e.g. the gRPC stream
// is closed).
End()
// Terminated is a channel that is closed when the session is terminated.
//
// The session-holder MUST receive on it and exit (e.g. close the gRPC stream)
// when it is closed.
Terminated() SessionTerminatedChan
}
type session struct {
l *SessionLimiter
id uint64
termCh chan struct{}
}
func (s *session) End() { s.l.deleteSessionWithID(s.id) }
func (s *session) Terminated() SessionTerminatedChan { return s.termCh }
func (s *session) terminate() { close(s.termCh) }