mirror of
https://github.com/status-im/consul.git
synced 2025-01-23 20:19:29 +00:00
5fb9df1640
* Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
143 lines
4.3 KiB
Go
143 lines
4.3 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package local
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
|
|
"github.com/hashicorp/consul/agent/local"
|
|
"github.com/hashicorp/consul/agent/proxycfg"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/agent/token"
|
|
)
|
|
|
|
const resyncFrequency = 30 * time.Second
|
|
|
|
const source proxycfg.ProxySource = "local"
|
|
|
|
// SyncConfig contains the dependencies required by Sync.
|
|
type SyncConfig struct {
|
|
// Manager is the proxycfg Manager with which proxy services will be registered.
|
|
Manager ConfigManager
|
|
|
|
// State is the agent's local state that will be watched for proxy registrations.
|
|
State *local.State
|
|
|
|
// Tokens is used to retrieve a fallback ACL token if a service is registered
|
|
// without one.
|
|
Tokens *token.Store
|
|
|
|
// NodeName is the name of the local agent node.
|
|
NodeName string
|
|
|
|
// Logger will be used to write log messages.
|
|
Logger hclog.Logger
|
|
|
|
// ResyncFrequency is how often to do a resync and recreate any terminated
|
|
// watches.
|
|
ResyncFrequency time.Duration
|
|
}
|
|
|
|
// Sync watches the agent's local state and registers/deregisters services with
|
|
// the proxycfg Manager ahead-of-time so they're ready immediately when a proxy
|
|
// begins an xDS stream.
|
|
//
|
|
// It runs until the given context is canceled, so should be called it its own
|
|
// goroutine.
|
|
//
|
|
// Note: proxy service definitions from the agent's local state will always
|
|
// overwrite definitions of the same service from other sources (e.g. the
|
|
// catalog).
|
|
func Sync(ctx context.Context, cfg SyncConfig) {
|
|
// Single item buffer is enough since there is no data transferred so this is
|
|
// "level triggering" and we can't miss actual data.
|
|
stateCh := make(chan struct{}, 1)
|
|
|
|
cfg.State.Notify(stateCh)
|
|
defer cfg.State.StopNotify(stateCh)
|
|
|
|
var resyncCh <-chan time.Time
|
|
for {
|
|
sync(cfg)
|
|
|
|
if resyncCh == nil && cfg.ResyncFrequency > 0 {
|
|
resyncCh = time.After(cfg.ResyncFrequency)
|
|
}
|
|
|
|
select {
|
|
case <-stateCh:
|
|
// Wait for a state change.
|
|
case <-resyncCh:
|
|
resyncCh = nil
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func sync(cfg SyncConfig) {
|
|
cfg.Logger.Trace("syncing proxy services from local state")
|
|
|
|
services := cfg.State.AllServices()
|
|
|
|
// Traverse the local state and ensure all proxy services are registered
|
|
for sid, svc := range services {
|
|
if !svc.Kind.IsProxy() {
|
|
continue
|
|
}
|
|
|
|
// Retrieve the token used to register the service, or fallback to the
|
|
// default user token. This token is expected to match the token used in
|
|
// the xDS request for this data.
|
|
token := cfg.State.ServiceToken(sid)
|
|
if token == "" {
|
|
token = cfg.Tokens.UserToken()
|
|
}
|
|
|
|
id := proxycfg.ProxyID{
|
|
ServiceID: sid,
|
|
NodeName: cfg.NodeName,
|
|
|
|
// Note: we *intentionally* don't set Token here. All watches on local
|
|
// services use the same ACL token, regardless of whatever token is
|
|
// presented in the xDS stream (the token presented to the xDS server
|
|
// is checked before the watch is created).
|
|
Token: "",
|
|
}
|
|
|
|
// TODO(banks): need to work out when to default some stuff. For example
|
|
// Proxy.LocalServicePort is practically necessary for any sidecar and can
|
|
// default to the port of the sidecar service, but only if it's already
|
|
// registered and once we get past here, we don't have enough context to
|
|
// know that so we'd need to set it here if not during registration of the
|
|
// proxy service. Sidecar Service in the interim can do that, but we should
|
|
// validate more generally that that is always true.
|
|
err := cfg.Manager.Register(id, svc, source, token, true)
|
|
if err != nil {
|
|
cfg.Logger.Error("failed to watch proxy service",
|
|
"service", sid.String(),
|
|
"error", err,
|
|
)
|
|
}
|
|
}
|
|
|
|
// Now see if any proxies were removed
|
|
for _, proxyID := range cfg.Manager.RegisteredProxies(source) {
|
|
if _, ok := services[proxyID.ServiceID]; !ok {
|
|
cfg.Manager.Deregister(proxyID, source)
|
|
}
|
|
}
|
|
}
|
|
|
|
//go:generate mockery --name ConfigManager --inpackage
|
|
type ConfigManager interface {
|
|
Watch(id proxycfg.ProxyID) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc)
|
|
Register(proxyID proxycfg.ProxyID, service *structs.NodeService, source proxycfg.ProxySource, token string, overwrite bool) error
|
|
Deregister(proxyID proxycfg.ProxyID, source proxycfg.ProxySource)
|
|
RegisteredProxies(source proxycfg.ProxySource) []proxycfg.ProxyID
|
|
}
|