consul/lib/routine/routine.go
hashicorp-copywrite[bot] 5fb9df1640
[COMPLIANCE] License changes (#18443)
* Adding explicit MPL license for sub-package

This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository.

* Adding explicit MPL license for sub-package

This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository.

* Updating the license from MPL to Business Source License

Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl.

* add missing license headers

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

---------

Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
2023-08-11 09:12:13 -04:00

177 lines
3.8 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package routine
import (
"context"
"os"
"sync"
"github.com/hashicorp/go-hclog"
)
type Routine func(ctx context.Context) error
// cancelCh is the ctx.Done()
// When cancel() is called, if the routine is running a blocking call (e.g. some ACL replication RPCs),
// stoppedCh won't be closed till the blocking call returns, while cancelCh will be closed immediately.
// cancelCh is used to properly detect routine running status between cancel() and close(stoppedCh)
type routineTracker struct {
cancel context.CancelFunc
cancelCh <-chan struct{} // closed when ctx is done
stoppedCh chan struct{} // closed when no longer running
}
func (r *routineTracker) running() bool {
select {
case <-r.stoppedCh:
return false
case <-r.cancelCh:
return false
default:
return true
}
}
func (r *routineTracker) wait() {
<-r.stoppedCh
}
type Manager struct {
lock sync.RWMutex
logger hclog.Logger
routines map[string]*routineTracker
}
func NewManager(logger hclog.Logger) *Manager {
if logger == nil {
logger = hclog.New(&hclog.LoggerOptions{
Output: os.Stderr,
})
}
return &Manager{
logger: logger,
routines: make(map[string]*routineTracker),
}
}
func (m *Manager) IsRunning(name string) bool {
m.lock.Lock()
defer m.lock.Unlock()
if routine, ok := m.routines[name]; ok {
return routine.running()
}
return false
}
func (m *Manager) Start(ctx context.Context, name string, routine Routine) error {
m.lock.Lock()
defer m.lock.Unlock()
if instance, ok := m.routines[name]; ok && instance.running() {
return nil
}
if ctx == nil {
ctx = context.Background()
}
rtCtx, cancel := context.WithCancel(ctx)
instance := &routineTracker{
cancel: cancel,
cancelCh: ctx.Done(),
stoppedCh: make(chan struct{}),
}
go m.execute(rtCtx, name, routine, instance.stoppedCh)
m.routines[name] = instance
m.logger.Info("started routine", "routine", name)
return nil
}
// execute will run the given routine in the foreground and close the given channel when its done executing
func (m *Manager) execute(ctx context.Context, name string, routine Routine, done chan struct{}) {
defer func() {
close(done)
}()
err := routine(ctx)
if err != nil && err != context.DeadlineExceeded && err != context.Canceled {
m.logger.Error("routine exited with error",
"routine", name,
"error", err,
)
} else {
m.logger.Info("stopped routine", "routine", name)
}
}
// Caveat: The returned stoppedCh indicates that the routine is completed
// It's possible that ctx is canceled, but stoppedCh not yet closed
// Use mgr.IsRunning(name) than this stoppedCh to tell whether the
// instance is still running (not cancelled or completed).
func (m *Manager) Stop(name string) <-chan struct{} {
instance := m.stopInstance(name)
if instance == nil {
// Fabricate a closed channel so it won't block forever.
ch := make(chan struct{})
close(ch)
return ch
}
return instance.stoppedCh
}
func (m *Manager) stopInstance(name string) *routineTracker {
m.lock.Lock()
defer m.lock.Unlock()
instance, ok := m.routines[name]
if !ok {
// no running instance
return nil
}
if !instance.running() {
return instance
}
m.logger.Info("stopping routine", "routine", name)
instance.cancel()
delete(m.routines, name)
return instance
}
// StopAll goroutines. Once StopAll is called, it is no longer safe to add no
// goroutines to the Manager.
func (m *Manager) StopAll() {
m.lock.Lock()
defer m.lock.Unlock()
for name, routine := range m.routines {
if !routine.running() {
continue
}
m.logger.Info("stopping routine", "routine", name)
routine.cancel()
}
}
// Wait for all goroutines to stop after StopAll is called.
func (m *Manager) Wait() {
m.lock.Lock()
defer m.lock.Unlock()
for _, routine := range m.routines {
routine.wait()
}
}