mirror of https://github.com/status-im/consul.git
Merge pull request #9302 from hashicorp/dnephin/add-service-3
agent: remove ServiceManager.Start goroutine
This commit is contained in:
commit
727a402810
|
@ -48,6 +48,7 @@ import (
|
||||||
"github.com/hashicorp/consul/ipaddr"
|
"github.com/hashicorp/consul/ipaddr"
|
||||||
"github.com/hashicorp/consul/lib"
|
"github.com/hashicorp/consul/lib"
|
||||||
"github.com/hashicorp/consul/lib/file"
|
"github.com/hashicorp/consul/lib/file"
|
||||||
|
"github.com/hashicorp/consul/lib/mutex"
|
||||||
"github.com/hashicorp/consul/logging"
|
"github.com/hashicorp/consul/logging"
|
||||||
"github.com/hashicorp/consul/tlsutil"
|
"github.com/hashicorp/consul/tlsutil"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
|
@ -222,7 +223,7 @@ type Agent struct {
|
||||||
exposedPorts map[string]int
|
exposedPorts map[string]int
|
||||||
|
|
||||||
// stateLock protects the agent state
|
// stateLock protects the agent state
|
||||||
stateLock sync.Mutex
|
stateLock *mutex.Mutex
|
||||||
|
|
||||||
// dockerClient is the client for performing docker health checks.
|
// dockerClient is the client for performing docker health checks.
|
||||||
dockerClient *checks.DockerClient
|
dockerClient *checks.DockerClient
|
||||||
|
@ -358,6 +359,7 @@ func New(bd BaseDeps) (*Agent, error) {
|
||||||
retryJoinCh: make(chan error),
|
retryJoinCh: make(chan error),
|
||||||
shutdownCh: make(chan struct{}),
|
shutdownCh: make(chan struct{}),
|
||||||
endpoints: make(map[string]string),
|
endpoints: make(map[string]string),
|
||||||
|
stateLock: mutex.New(),
|
||||||
|
|
||||||
baseDeps: bd,
|
baseDeps: bd,
|
||||||
tokens: bd.Tokens,
|
tokens: bd.Tokens,
|
||||||
|
@ -512,7 +514,6 @@ func (a *Agent) Start(ctx context.Context) error {
|
||||||
if err := a.baseDeps.AutoConfig.Start(&lib.StopChannelContext{StopCh: a.shutdownCh}); err != nil {
|
if err := a.baseDeps.AutoConfig.Start(&lib.StopChannelContext{StopCh: a.shutdownCh}); err != nil {
|
||||||
return fmt.Errorf("AutoConf failed to start certificate monitor: %w", err)
|
return fmt.Errorf("AutoConf failed to start certificate monitor: %w", err)
|
||||||
}
|
}
|
||||||
a.serviceManager.Start()
|
|
||||||
|
|
||||||
// Load checks/services/metadata.
|
// Load checks/services/metadata.
|
||||||
emptyCheckSnapshot := map[structs.CheckID]*structs.HealthCheck{}
|
emptyCheckSnapshot := map[structs.CheckID]*structs.HealthCheck{}
|
||||||
|
|
|
@ -26,13 +26,6 @@ type ServiceManager struct {
|
||||||
// services tracks all active watches for registered services
|
// services tracks all active watches for registered services
|
||||||
services map[structs.ServiceID]*serviceConfigWatch
|
services map[structs.ServiceID]*serviceConfigWatch
|
||||||
|
|
||||||
// registerCh is a channel for receiving service registration requests from
|
|
||||||
// from serviceConfigWatchers.
|
|
||||||
// The registrations are handled in the background when watches are notified of
|
|
||||||
// changes. All sends and receives must also obey the ctx.Done() channel to
|
|
||||||
// avoid a deadlock during shutdown.
|
|
||||||
registerCh chan *asyncRegisterRequest
|
|
||||||
|
|
||||||
// ctx is the shared context for all goroutines launched
|
// ctx is the shared context for all goroutines launched
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
|
||||||
|
@ -46,11 +39,10 @@ type ServiceManager struct {
|
||||||
func NewServiceManager(agent *Agent) *ServiceManager {
|
func NewServiceManager(agent *Agent) *ServiceManager {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
return &ServiceManager{
|
return &ServiceManager{
|
||||||
agent: agent,
|
agent: agent,
|
||||||
services: make(map[structs.ServiceID]*serviceConfigWatch),
|
services: make(map[structs.ServiceID]*serviceConfigWatch),
|
||||||
registerCh: make(chan *asyncRegisterRequest), // must be unbuffered
|
ctx: ctx,
|
||||||
ctx: ctx,
|
cancel: cancel,
|
||||||
cancel: cancel,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,36 +54,6 @@ func (s *ServiceManager) Stop() {
|
||||||
s.running.Wait()
|
s.running.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start starts a background worker goroutine that writes back into the Agent
|
|
||||||
// state. This only exists to keep the need to lock the agent state lock out of
|
|
||||||
// the main AddService/RemoveService codepaths to avoid deadlocks.
|
|
||||||
func (s *ServiceManager) Start() {
|
|
||||||
s.running.Add(1)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer s.running.Done()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-s.ctx.Done():
|
|
||||||
return
|
|
||||||
case req := <-s.registerCh:
|
|
||||||
req.Reply <- s.registerOnce(req.Args)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// runOnce will process a single registration request
|
|
||||||
func (s *ServiceManager) registerOnce(args addServiceInternalRequest) error {
|
|
||||||
s.agent.stateLock.Lock()
|
|
||||||
defer s.agent.stateLock.Unlock()
|
|
||||||
|
|
||||||
if err := s.agent.addServiceInternal(args); err != nil {
|
|
||||||
return fmt.Errorf("error updating service registration: %v", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddService will (re)create a serviceConfigWatch on the given service. For
|
// AddService will (re)create a serviceConfigWatch on the given service. For
|
||||||
// each call of this function the first registration will happen inline and
|
// each call of this function the first registration will happen inline and
|
||||||
// will read the merged global defaults for the service through the agent cache
|
// will read the merged global defaults for the service through the agent cache
|
||||||
|
@ -129,12 +91,11 @@ func (s *ServiceManager) AddService(req addServiceLockedRequest) error {
|
||||||
|
|
||||||
// Get the existing global config and do the initial registration with the
|
// Get the existing global config and do the initial registration with the
|
||||||
// merged config.
|
// merged config.
|
||||||
watch := &serviceConfigWatch{
|
watch := &serviceConfigWatch{registration: req, agent: s.agent}
|
||||||
registration: req,
|
if err := watch.register(s.ctx); err != nil {
|
||||||
agent: s.agent,
|
return err
|
||||||
registerCh: s.registerCh,
|
|
||||||
}
|
}
|
||||||
if err := watch.RegisterAndStart(s.ctx, &s.running); err != nil {
|
if err := watch.start(s.ctx, &s.running); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,9 +126,7 @@ func (s *ServiceManager) RemoveService(serviceID structs.ServiceID) {
|
||||||
// service/proxy defaults.
|
// service/proxy defaults.
|
||||||
type serviceConfigWatch struct {
|
type serviceConfigWatch struct {
|
||||||
registration addServiceLockedRequest
|
registration addServiceLockedRequest
|
||||||
|
agent *Agent
|
||||||
agent *Agent
|
|
||||||
registerCh chan<- *asyncRegisterRequest
|
|
||||||
|
|
||||||
// cacheKey stores the key of the current request, when registration changes
|
// cacheKey stores the key of the current request, when registration changes
|
||||||
// we check to see if a new cache watch is needed.
|
// we check to see if a new cache watch is needed.
|
||||||
|
@ -178,7 +137,7 @@ type serviceConfigWatch struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: this is called while holding the Agent.stateLock
|
// NOTE: this is called while holding the Agent.stateLock
|
||||||
func (w *serviceConfigWatch) RegisterAndStart(ctx context.Context, wg *sync.WaitGroup) error {
|
func (w *serviceConfigWatch) register(ctx context.Context) error {
|
||||||
serviceDefaults, err := w.registration.serviceDefaults(ctx)
|
serviceDefaults, err := w.registration.serviceDefaults(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v",
|
return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v",
|
||||||
|
@ -204,10 +163,7 @@ func (w *serviceConfigWatch) RegisterAndStart(ctx context.Context, wg *sync.Wait
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error updating service registration: %v", err)
|
return fmt.Errorf("error updating service registration: %v", err)
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
// Start the config watch, which starts a blocking query for the
|
|
||||||
// resolved service config in the background.
|
|
||||||
return w.start(ctx, wg)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func serviceDefaultsFromStruct(v *structs.ServiceConfigResponse) func(context.Context) (*structs.ServiceConfigResponse, error) {
|
func serviceDefaultsFromStruct(v *structs.ServiceConfigResponse) func(context.Context) (*structs.ServiceConfigResponse, error) {
|
||||||
|
@ -256,13 +212,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro
|
||||||
// context before we cancel and so might still deliver the old event. Using
|
// context before we cancel and so might still deliver the old event. Using
|
||||||
// the cacheKey allows us to ignore updates from the old cache watch and makes
|
// the cacheKey allows us to ignore updates from the old cache watch and makes
|
||||||
// even this rare edge case safe.
|
// even this rare edge case safe.
|
||||||
err := w.agent.cache.Notify(
|
err := w.agent.cache.Notify(ctx, cachetype.ResolvedServiceConfigName, req, w.cacheKey, updateCh)
|
||||||
ctx,
|
|
||||||
cachetype.ResolvedServiceConfigName,
|
|
||||||
req,
|
|
||||||
w.cacheKey,
|
|
||||||
updateCh,
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.cancelFunc()
|
w.cancelFunc()
|
||||||
return err
|
return err
|
||||||
|
@ -331,47 +281,31 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// While we were waiting on the agent state lock we may have been shutdown.
|
|
||||||
// So avoid doing a registration in that case.
|
|
||||||
if err := ctx.Err(); err != nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// make a copy of the AddServiceRequest
|
// make a copy of the AddServiceRequest
|
||||||
req := w.registration
|
req := w.registration
|
||||||
req.Service = merged
|
req.Service = merged
|
||||||
req.persistServiceConfig = true
|
req.persistServiceConfig = true
|
||||||
|
|
||||||
registerReq := &asyncRegisterRequest{
|
args := addServiceInternalRequest{
|
||||||
Args: addServiceInternalRequest{
|
addServiceLockedRequest: req,
|
||||||
addServiceLockedRequest: req,
|
persistService: w.registration.Service,
|
||||||
persistService: w.registration.Service,
|
persistServiceDefaults: serviceDefaults,
|
||||||
persistServiceDefaults: serviceDefaults,
|
|
||||||
},
|
|
||||||
Reply: make(chan error, 1),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
if err := w.agent.stateLock.TryLock(ctx); err != nil {
|
||||||
case <-ctx.Done():
|
|
||||||
return nil
|
|
||||||
case w.registerCh <- registerReq:
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil
|
|
||||||
|
|
||||||
case err := <-registerReq.Reply:
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error updating service registration: %v", err)
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
defer w.agent.stateLock.Unlock()
|
||||||
|
|
||||||
type asyncRegisterRequest struct {
|
// The context may have been cancelled after the lock was acquired.
|
||||||
Args addServiceInternalRequest
|
if err := ctx.Err(); err != nil {
|
||||||
Reply chan error
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := w.agent.addServiceInternal(args); err != nil {
|
||||||
|
return fmt.Errorf("error updating service registration: %v", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceConfigRequest {
|
func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceConfigRequest {
|
||||||
|
|
|
@ -387,12 +387,19 @@ func TestServiceManager_PersistService_API(t *testing.T) {
|
||||||
configFile := filepath.Join(a.Config.DataDir, serviceConfigDir, svcID.StringHash())
|
configFile := filepath.Join(a.Config.DataDir, serviceConfigDir, svcID.StringHash())
|
||||||
|
|
||||||
// Service is not persisted unless requested, but we always persist service configs.
|
// Service is not persisted unless requested, but we always persist service configs.
|
||||||
require.NoError(a.addServiceFromSource(svc, nil, false, "", ConfigSourceRemote))
|
err = a.AddService(AddServiceRequest{Service: svc, Source: ConfigSourceRemote})
|
||||||
|
require.NoError(err)
|
||||||
requireFileIsAbsent(t, svcFile)
|
requireFileIsAbsent(t, svcFile)
|
||||||
requireFileIsPresent(t, configFile)
|
requireFileIsPresent(t, configFile)
|
||||||
|
|
||||||
// Persists to file if requested
|
// Persists to file if requested
|
||||||
require.NoError(a.addServiceFromSource(svc, nil, true, "mytoken", ConfigSourceRemote))
|
err = a.AddService(AddServiceRequest{
|
||||||
|
Service: svc,
|
||||||
|
persist: true,
|
||||||
|
token: "mytoken",
|
||||||
|
Source: ConfigSourceRemote,
|
||||||
|
})
|
||||||
|
require.NoError(err)
|
||||||
requireFileIsPresent(t, svcFile)
|
requireFileIsPresent(t, svcFile)
|
||||||
requireFileIsPresent(t, configFile)
|
requireFileIsPresent(t, configFile)
|
||||||
|
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -84,7 +84,7 @@ require (
|
||||||
golang.org/x/crypto v0.0.0-20200930160638-afb6bcd081ae
|
golang.org/x/crypto v0.0.0-20200930160638-afb6bcd081ae
|
||||||
golang.org/x/net v0.0.0-20200930145003-4acb6c075d10
|
golang.org/x/net v0.0.0-20200930145003-4acb6c075d10
|
||||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
|
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
|
||||||
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
|
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
|
||||||
golang.org/x/sys v0.0.0-20201024232916-9f70ab9862d5
|
golang.org/x/sys v0.0.0-20201024232916-9f70ab9862d5
|
||||||
golang.org/x/text v0.3.3 // indirect
|
golang.org/x/text v0.3.3 // indirect
|
||||||
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
|
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -583,8 +583,8 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
|
||||||
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a h1:WXEvlFVvvGxCJLG6REjsT03iWnKLEWinaScsxF2Vm2o=
|
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
|
||||||
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
|
|
|
@ -0,0 +1,36 @@
|
||||||
|
/*
|
||||||
|
Package mutex implements the sync.Locker interface using x/sync/semaphore. It
|
||||||
|
may be used as a replacement for sync.Mutex when one or more goroutines need to
|
||||||
|
allow their calls to Lock to be cancelled by context cancellation.
|
||||||
|
*/
|
||||||
|
package mutex
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"golang.org/x/sync/semaphore"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Mutex semaphore.Weighted
|
||||||
|
|
||||||
|
// New returns a Mutex that is ready for use.
|
||||||
|
func New() *Mutex {
|
||||||
|
return (*Mutex)(semaphore.NewWeighted(1))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Mutex) Lock() {
|
||||||
|
_ = (*semaphore.Weighted)(m).Acquire(context.Background(), 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Mutex) Unlock() {
|
||||||
|
(*semaphore.Weighted)(m).Release(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TryLock acquires the mutex, blocking until resources are available or ctx is
|
||||||
|
// done. On success, returns nil. On failure, returns ctx.Err() and leaves the
|
||||||
|
// semaphore unchanged.
|
||||||
|
//
|
||||||
|
// If ctx is already done, Acquire may still succeed without blocking.
|
||||||
|
func (m *Mutex) TryLock(ctx context.Context) error {
|
||||||
|
return (*semaphore.Weighted)(m).Acquire(ctx, 1)
|
||||||
|
}
|
|
@ -0,0 +1,93 @@
|
||||||
|
package mutex
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMutex(t *testing.T) {
|
||||||
|
t.Run("starts unlocked", func(t *testing.T) {
|
||||||
|
m := New()
|
||||||
|
canLock(t, m)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Lock blocks when locked", func(t *testing.T) {
|
||||||
|
m := New()
|
||||||
|
m.Lock()
|
||||||
|
lockIsBlocked(t, m)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Unlock unblocks Lock", func(t *testing.T) {
|
||||||
|
m := New()
|
||||||
|
m.Lock()
|
||||||
|
m.Unlock() // nolint:staticcheck // SA2001 is not relevant here
|
||||||
|
canLock(t, m)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("TryLock acquires lock", func(t *testing.T) {
|
||||||
|
m := New()
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
require.NoError(t, m.TryLock(ctx))
|
||||||
|
lockIsBlocked(t, m)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("TryLock blocks until timeout when locked", func(t *testing.T) {
|
||||||
|
m := New()
|
||||||
|
m.Lock()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
err := m.TryLock(ctx)
|
||||||
|
require.Equal(t, err, context.DeadlineExceeded)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("TryLock acquires lock before timeout", func(t *testing.T) {
|
||||||
|
m := New()
|
||||||
|
m.Lock()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
time.Sleep(20 * time.Millisecond)
|
||||||
|
m.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
err := m.TryLock(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func canLock(t *testing.T, m *Mutex) {
|
||||||
|
t.Helper()
|
||||||
|
chDone := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
m.Lock()
|
||||||
|
close(chDone)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-chDone:
|
||||||
|
case <-time.After(20 * time.Millisecond):
|
||||||
|
t.Fatal("failed to acquire lock before timeout")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func lockIsBlocked(t *testing.T, m *Mutex) {
|
||||||
|
t.Helper()
|
||||||
|
chDone := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
m.Lock()
|
||||||
|
close(chDone)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-chDone:
|
||||||
|
t.Fatal("expected Lock to block")
|
||||||
|
case <-time.After(20 * time.Millisecond):
|
||||||
|
}
|
||||||
|
}
|
|
@ -6,7 +6,42 @@
|
||||||
// mechanism.
|
// mechanism.
|
||||||
package singleflight // import "golang.org/x/sync/singleflight"
|
package singleflight // import "golang.org/x/sync/singleflight"
|
||||||
|
|
||||||
import "sync"
|
import (
|
||||||
|
"bytes"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"runtime"
|
||||||
|
"runtime/debug"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// errGoexit indicates the runtime.Goexit was called in
|
||||||
|
// the user given function.
|
||||||
|
var errGoexit = errors.New("runtime.Goexit was called")
|
||||||
|
|
||||||
|
// A panicError is an arbitrary value recovered from a panic
|
||||||
|
// with the stack trace during the execution of given function.
|
||||||
|
type panicError struct {
|
||||||
|
value interface{}
|
||||||
|
stack []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error implements error interface.
|
||||||
|
func (p *panicError) Error() string {
|
||||||
|
return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPanicError(v interface{}) error {
|
||||||
|
stack := debug.Stack()
|
||||||
|
|
||||||
|
// The first line of the stack trace is of the form "goroutine N [status]:"
|
||||||
|
// but by the time the panic reaches Do the goroutine may no longer exist
|
||||||
|
// and its status will have changed. Trim out the misleading line.
|
||||||
|
if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
|
||||||
|
stack = stack[line+1:]
|
||||||
|
}
|
||||||
|
return &panicError{value: v, stack: stack}
|
||||||
|
}
|
||||||
|
|
||||||
// call is an in-flight or completed singleflight.Do call
|
// call is an in-flight or completed singleflight.Do call
|
||||||
type call struct {
|
type call struct {
|
||||||
|
@ -57,6 +92,12 @@ func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, e
|
||||||
c.dups++
|
c.dups++
|
||||||
g.mu.Unlock()
|
g.mu.Unlock()
|
||||||
c.wg.Wait()
|
c.wg.Wait()
|
||||||
|
|
||||||
|
if e, ok := c.err.(*panicError); ok {
|
||||||
|
panic(e)
|
||||||
|
} else if c.err == errGoexit {
|
||||||
|
runtime.Goexit()
|
||||||
|
}
|
||||||
return c.val, c.err, true
|
return c.val, c.err, true
|
||||||
}
|
}
|
||||||
c := new(call)
|
c := new(call)
|
||||||
|
@ -70,6 +111,8 @@ func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, e
|
||||||
|
|
||||||
// DoChan is like Do but returns a channel that will receive the
|
// DoChan is like Do but returns a channel that will receive the
|
||||||
// results when they are ready.
|
// results when they are ready.
|
||||||
|
//
|
||||||
|
// The returned channel will not be closed.
|
||||||
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
|
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
|
||||||
ch := make(chan Result, 1)
|
ch := make(chan Result, 1)
|
||||||
g.mu.Lock()
|
g.mu.Lock()
|
||||||
|
@ -94,17 +137,66 @@ func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result
|
||||||
|
|
||||||
// doCall handles the single call for a key.
|
// doCall handles the single call for a key.
|
||||||
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
|
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
|
||||||
c.val, c.err = fn()
|
normalReturn := false
|
||||||
c.wg.Done()
|
recovered := false
|
||||||
|
|
||||||
g.mu.Lock()
|
// use double-defer to distinguish panic from runtime.Goexit,
|
||||||
if !c.forgotten {
|
// more details see https://golang.org/cl/134395
|
||||||
delete(g.m, key)
|
defer func() {
|
||||||
|
// the given function invoked runtime.Goexit
|
||||||
|
if !normalReturn && !recovered {
|
||||||
|
c.err = errGoexit
|
||||||
|
}
|
||||||
|
|
||||||
|
c.wg.Done()
|
||||||
|
g.mu.Lock()
|
||||||
|
defer g.mu.Unlock()
|
||||||
|
if !c.forgotten {
|
||||||
|
delete(g.m, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
if e, ok := c.err.(*panicError); ok {
|
||||||
|
// In order to prevent the waiting channels from being blocked forever,
|
||||||
|
// needs to ensure that this panic cannot be recovered.
|
||||||
|
if len(c.chans) > 0 {
|
||||||
|
go panic(e)
|
||||||
|
select {} // Keep this goroutine around so that it will appear in the crash dump.
|
||||||
|
} else {
|
||||||
|
panic(e)
|
||||||
|
}
|
||||||
|
} else if c.err == errGoexit {
|
||||||
|
// Already in the process of goexit, no need to call again
|
||||||
|
} else {
|
||||||
|
// Normal return
|
||||||
|
for _, ch := range c.chans {
|
||||||
|
ch <- Result{c.val, c.err, c.dups > 0}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
func() {
|
||||||
|
defer func() {
|
||||||
|
if !normalReturn {
|
||||||
|
// Ideally, we would wait to take a stack trace until we've determined
|
||||||
|
// whether this is a panic or a runtime.Goexit.
|
||||||
|
//
|
||||||
|
// Unfortunately, the only way we can distinguish the two is to see
|
||||||
|
// whether the recover stopped the goroutine from terminating, and by
|
||||||
|
// the time we know that, the part of the stack trace relevant to the
|
||||||
|
// panic has been discarded.
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
c.err = newPanicError(r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
c.val, c.err = fn()
|
||||||
|
normalReturn = true
|
||||||
|
}()
|
||||||
|
|
||||||
|
if !normalReturn {
|
||||||
|
recovered = true
|
||||||
}
|
}
|
||||||
for _, ch := range c.chans {
|
|
||||||
ch <- Result{c.val, c.err, c.dups > 0}
|
|
||||||
}
|
|
||||||
g.mu.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Forget tells the singleflight to forget about a key. Future calls
|
// Forget tells the singleflight to forget about a key. Future calls
|
||||||
|
|
|
@ -505,7 +505,7 @@ golang.org/x/oauth2/google
|
||||||
golang.org/x/oauth2/internal
|
golang.org/x/oauth2/internal
|
||||||
golang.org/x/oauth2/jws
|
golang.org/x/oauth2/jws
|
||||||
golang.org/x/oauth2/jwt
|
golang.org/x/oauth2/jwt
|
||||||
# golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
|
# golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
|
||||||
golang.org/x/sync/errgroup
|
golang.org/x/sync/errgroup
|
||||||
golang.org/x/sync/semaphore
|
golang.org/x/sync/semaphore
|
||||||
golang.org/x/sync/singleflight
|
golang.org/x/sync/singleflight
|
||||||
|
|
Loading…
Reference in New Issue