mirror of
https://github.com/status-im/consul.git
synced 2025-01-09 13:26:07 +00:00
5fb9df1640
* Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
113 lines
2.8 KiB
Go
113 lines
2.8 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package controller
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
|
|
"github.com/hashicorp/consul/internal/resource"
|
|
"github.com/hashicorp/consul/proto-public/pbresource"
|
|
)
|
|
|
|
// Manager is responsible for scheduling the execution of controllers.
|
|
type Manager struct {
|
|
client pbresource.ResourceServiceClient
|
|
logger hclog.Logger
|
|
|
|
raftLeader atomic.Bool
|
|
|
|
mu sync.Mutex
|
|
running bool
|
|
controllers []Controller
|
|
leaseChans []chan struct{}
|
|
}
|
|
|
|
// NewManager creates a Manager. logger will be used by the Manager, and as the
|
|
// base logger for controllers when one is not specified using WithLogger.
|
|
func NewManager(client pbresource.ResourceServiceClient, logger hclog.Logger) *Manager {
|
|
return &Manager{
|
|
client: client,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// Register the given controller to be executed by the Manager. Cannot be called
|
|
// once the Manager is running.
|
|
func (m *Manager) Register(ctrl Controller) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
if m.running {
|
|
panic("cannot register additional controllers after calling Run")
|
|
}
|
|
|
|
if ctrl.reconciler == nil {
|
|
panic(fmt.Sprintf("cannot register controller without a reconciler %s", ctrl))
|
|
}
|
|
|
|
m.controllers = append(m.controllers, ctrl)
|
|
}
|
|
|
|
// Run the Manager and start executing controllers until the given context is
|
|
// canceled. Cannot be called more than once.
|
|
func (m *Manager) Run(ctx context.Context) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
if m.running {
|
|
panic("cannot call Run more than once")
|
|
}
|
|
m.running = true
|
|
|
|
for _, desc := range m.controllers {
|
|
logger := desc.logger
|
|
if logger == nil {
|
|
logger = m.logger.With("managed_type", resource.ToGVK(desc.managedType))
|
|
}
|
|
|
|
runner := &controllerRunner{
|
|
ctrl: desc,
|
|
client: m.client,
|
|
logger: logger,
|
|
}
|
|
go newSupervisor(runner.run, m.newLeaseLocked(desc)).run(ctx)
|
|
}
|
|
}
|
|
|
|
// SetRaftLeader notifies the Manager of Raft leadership changes. Controllers
|
|
// are currently only executed on the Raft leader, so calling this method will
|
|
// cause the Manager to spin them up/down accordingly.
|
|
func (m *Manager) SetRaftLeader(leader bool) {
|
|
m.raftLeader.Store(leader)
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
for _, ch := range m.leaseChans {
|
|
select {
|
|
case ch <- struct{}{}:
|
|
default:
|
|
// Do not block if there's nothing receiving on ch (because the supervisor is
|
|
// busy doing something else). Note that ch has a buffer of 1, so we'll never
|
|
// miss the notification that something has changed so we need to re-evaluate
|
|
// the lease.
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) newLeaseLocked(ctrl Controller) Lease {
|
|
if ctrl.placement == PlacementEachServer {
|
|
return eternalLease{}
|
|
}
|
|
|
|
ch := make(chan struct{}, 1)
|
|
m.leaseChans = append(m.leaseChans, ch)
|
|
return &raftLease{m: m, ch: ch}
|
|
}
|