mirror of
https://github.com/status-im/consul.git
synced 2025-01-22 11:40:06 +00:00
5fb9df1640
* Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
110 lines
2.9 KiB
Go
110 lines
2.9 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
// Package semaphore implements a simple semaphore that is based on
|
|
// golang.org/x/sync/semaphore but doesn't support weights. It's advantage over
|
|
// a simple buffered chan is that the capacity of the semaphore (i.e. the number
|
|
// of slots available) can be changed dynamically at runtime without waiting for
|
|
// all existing work to stop. This makes it easier to implement e.g. concurrency
|
|
// limits on certain operations that can be reconfigured at runtime.
|
|
package semaphore
|
|
|
|
import (
|
|
"container/list"
|
|
"context"
|
|
"sync"
|
|
)
|
|
|
|
// Dynamic implements a semaphore whose capacity can be changed dynamically at
|
|
// run time.
|
|
type Dynamic struct {
|
|
size int64
|
|
cur int64
|
|
waiters list.List
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// NewDynamic returns a dynamic semaphore with the given initial capacity. Note
|
|
// that this is for convenience and to match golang.org/x/sync/semaphore however
|
|
// it's possible to use a zero-value semaphore provided SetSize is called before
|
|
// use.
|
|
func NewDynamic(n int64) *Dynamic {
|
|
return &Dynamic{
|
|
size: n,
|
|
}
|
|
}
|
|
|
|
// SetSize dynamically updates the number of available slots. If there are more
|
|
// than n slots currently acquired, no further acquires will succeed until
|
|
// sufficient have been released to take the total outstanding below n again.
|
|
func (s *Dynamic) SetSize(n int64) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.size = n
|
|
return nil
|
|
}
|
|
|
|
// Acquire attempts to acquire one "slot" in the semaphore, blocking only until
|
|
// ctx is Done. On success, returns nil. On failure, returns ctx.Err() and leaves
|
|
// the semaphore unchanged.
|
|
//
|
|
// If ctx is already done, Acquire may still succeed without blocking.
|
|
func (s *Dynamic) Acquire(ctx context.Context) error {
|
|
s.mu.Lock()
|
|
if s.cur < s.size {
|
|
s.cur++
|
|
s.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// Need to wait, add to waiter list
|
|
ready := make(chan struct{})
|
|
elem := s.waiters.PushBack(ready)
|
|
s.mu.Unlock()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
err := ctx.Err()
|
|
s.mu.Lock()
|
|
select {
|
|
case <-ready:
|
|
// Acquired the semaphore after we were canceled. Rather than trying to
|
|
// fix up the queue, just pretend we didn't notice the cancellation.
|
|
err = nil
|
|
default:
|
|
s.waiters.Remove(elem)
|
|
}
|
|
s.mu.Unlock()
|
|
return err
|
|
|
|
case <-ready:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Release releases the semaphore. It will panic if release is called on an
|
|
// empty semphore.
|
|
func (s *Dynamic) Release() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if s.cur < 1 {
|
|
panic("semaphore: bad release")
|
|
}
|
|
|
|
next := s.waiters.Front()
|
|
|
|
// If there are no waiters, just decrement and we're done
|
|
if next == nil {
|
|
s.cur--
|
|
return
|
|
}
|
|
|
|
// Need to yield our slot to the next waiter.
|
|
// Remove them from the list
|
|
s.waiters.Remove(next)
|
|
// And trigger it's chan before we release the lock
|
|
close(next.Value.(chan struct{}))
|
|
// Note we _don't_ decrement inflight since the slot was yielded directly.
|
|
}
|