Watch the ProxyTracker from xDS controller (#18611)

This commit is contained in:
Ashwin Venkatesh 2023-08-29 17:39:29 -04:00 committed by GitHub
parent 0e606504bc
commit 797e42dc24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 137 additions and 69 deletions

View File

@ -9,9 +9,6 @@ import (
"encoding/json"
"errors"
"fmt"
catalogproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/catalog"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
"github.com/hashicorp/consul/lib/stringslice"
"io"
"net"
"net/http"
@ -57,6 +54,7 @@ import (
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
proxycfgglue "github.com/hashicorp/consul/agent/proxycfg-glue"
catalogproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/catalog"
localproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/local"
"github.com/hashicorp/consul/agent/rpcclient"
"github.com/hashicorp/consul/agent/rpcclient/configentry"
@ -67,11 +65,13 @@ import (
"github.com/hashicorp/consul/agent/xds"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/file"
"github.com/hashicorp/consul/lib/mutex"
"github.com/hashicorp/consul/lib/routine"
"github.com/hashicorp/consul/lib/stringslice"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/proto/private/pboperator"

View File

@ -8,7 +8,6 @@ import (
"crypto/x509"
"errors"
"fmt"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"io"
"net"
"os"
@ -74,6 +73,7 @@ import (
"github.com/hashicorp/consul/internal/catalog"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/demo"
"github.com/hashicorp/consul/internal/resource/reaper"

View File

@ -6,9 +6,6 @@ package catalog
import (
"context"
"errors"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"github.com/hashicorp/consul/proto-public/pbresource"
"sync"
"github.com/hashicorp/go-hclog"
@ -16,9 +13,12 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"github.com/hashicorp/consul/proto-public/pbresource"
)
const source proxycfg.ProxySource = "catalog"

View File

@ -6,8 +6,6 @@ package catalog
import (
"context"
"errors"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
rtest "github.com/hashicorp/consul/internal/resource/resourcetest"
"testing"
"time"
@ -23,6 +21,8 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/internal/mesh"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
rtest "github.com/hashicorp/consul/internal/resource/resourcetest"
)
func TestConfigSource_Success(t *testing.T) {

View File

@ -1,12 +1,13 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
// Code generated by mockery v2.33.1. DO NOT EDIT.
package catalog
import (
proxycfg "github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
mock "github.com/stretchr/testify/mock"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
structs "github.com/hashicorp/consul/agent/structs"
)

View File

@ -1,11 +1,11 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
// Code generated by mockery v2.33.1. DO NOT EDIT.
package catalog
import (
context "context"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
limiter "github.com/hashicorp/consul/agent/grpc-external/limiter"
mock "github.com/stretchr/testify/mock"
)

View File

@ -1,13 +1,14 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
// Code generated by mockery v2.33.1. DO NOT EDIT.
package catalog
import (
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
limiter "github.com/hashicorp/consul/agent/grpc-external/limiter"
mock "github.com/stretchr/testify/mock"
pbresource "github.com/hashicorp/consul/proto-public/pbresource"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
)
// MockWatcher is an autogenerated mock type for the Watcher type

View File

@ -1,12 +1,13 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
// Code generated by mockery v2.33.1. DO NOT EDIT.
package local
import (
proxycfg "github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
mock "github.com/stretchr/testify/mock"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
structs "github.com/hashicorp/consul/agent/structs"
)

View File

@ -5,7 +5,6 @@ package proxycfg
import (
"errors"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"runtime/debug"
"sync"
@ -13,6 +12,7 @@ import (
"golang.org/x/time/rate"
"github.com/hashicorp/consul/agent/structs"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"github.com/hashicorp/consul/tlsutil"
)

View File

@ -4,7 +4,6 @@
package proxycfg
import (
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"testing"
"time"
@ -18,6 +17,7 @@ import (
"github.com/hashicorp/consul/agent/proxycfg/internal/watch"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"github.com/hashicorp/consul/proto/private/pbpeering"
"github.com/hashicorp/consul/sdk/testutil"
)

View File

@ -5,10 +5,6 @@ package agent
import (
"encoding/json"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/internal/mesh"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
rtest "github.com/hashicorp/consul/internal/resource/resourcetest"
"net/http"
"net/http/httptest"
"testing"
@ -16,8 +12,12 @@ import (
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/internal/mesh"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
rtest "github.com/hashicorp/consul/internal/resource/resourcetest"
"github.com/hashicorp/consul/testrpc"
)

View File

@ -8,12 +8,6 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/hashicorp/consul/agent/xds/configfetcher"
"github.com/hashicorp/consul/agent/xdsv2"
"github.com/hashicorp/consul/internal/mesh"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
"github.com/hashicorp/consul/proto-public/pbresource"
"strconv"
"sync"
"sync/atomic"
@ -25,7 +19,6 @@ import (
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/hashicorp/go-hclog"
goversion "github.com/hashicorp/go-version"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
@ -35,10 +28,16 @@ import (
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/xds/configfetcher"
"github.com/hashicorp/consul/agent/xds/extensionruntime"
"github.com/hashicorp/consul/agent/xdsv2"
"github.com/hashicorp/consul/envoyextensions/extensioncommon"
"github.com/hashicorp/consul/envoyextensions/xdscommon"
"github.com/hashicorp/consul/internal/mesh"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/version"
)

View File

@ -5,13 +5,15 @@ package proxystateconverter
import (
"fmt"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/configfetcher"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate"
"github.com/hashicorp/go-hclog"
)
// Converter converts a single snapshot into a ProxyState.

View File

@ -6,19 +6,12 @@ package xds
import (
"context"
"errors"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"github.com/hashicorp/consul/proto-public/pbresource"
"sync/atomic"
"time"
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/hashicorp/consul/agent/xds/configfetcher"
"github.com/hashicorp/consul/envoyextensions/xdscommon"
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@ -26,6 +19,11 @@ import (
"github.com/hashicorp/consul/acl"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/xds/configfetcher"
"github.com/hashicorp/consul/envoyextensions/xdscommon"
"github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"github.com/hashicorp/consul/proto-public/pbresource"
)
var (

View File

@ -4,19 +4,12 @@
package xds
import (
"github.com/hashicorp/consul/agent/proxycfg-sources/catalog"
"github.com/hashicorp/consul/agent/xds/response"
"github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"github.com/hashicorp/consul/proto-public/pbresource"
"sort"
"sync"
"testing"
"time"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/envoyextensions/xdscommon"
"github.com/armon/go-metrics"
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
@ -31,8 +24,6 @@ import (
envoy_upstreams_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/upstreams/http/v3"
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
envoy_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"github.com/armon/go-metrics"
"github.com/mitchellh/copystructure"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
@ -40,8 +31,15 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/wrapperspb"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/proxycfg-sources/catalog"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/response"
"github.com/hashicorp/consul/envoyextensions/xdscommon"
"github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/sdk/testutil"
)

View File

@ -5,11 +5,12 @@ package xdsv2
import (
"fmt"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
"github.com/hashicorp/go-hclog"
"google.golang.org/protobuf/proto"
"github.com/hashicorp/consul/envoyextensions/xdscommon"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
)
// ResourceGenerator is associated with a single gRPC stream and creates xDS

View File

@ -5,6 +5,7 @@ package xds
import (
"context"
"github.com/hashicorp/consul/internal/catalog"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh/internal/controllers/xds/status"
@ -20,13 +21,13 @@ import (
const ControllerName = "consul.io/xds-controller"
func Controller(mapper *bimapper.Mapper, updater ProxyUpdater, fetcher TrustBundleFetcher) controller.Controller {
//if mapper == nil || updater == nil || fetcher == nil {
if mapper == nil || fetcher == nil {
if mapper == nil || updater == nil || fetcher == nil {
panic("mapper, updater and fetcher are required")
}
return controller.ForType(types.ProxyStateTemplateType).
WithWatch(catalog.ServiceEndpointsType, mapper.MapLink).
WithCustomWatch(proxySource(updater), proxyMapper).
WithPlacement(controller.PlacementEachServer).
WithReconciler(&xdsReconciler{bimapper: mapper, updater: updater, fetchTrustBundle: fetcher})
}
@ -47,6 +48,9 @@ type ProxyUpdater interface {
// ProxyConnectedToServer returns whether this id is connected to this server.
ProxyConnectedToServer(id *pbresource.ID) bool
// EventChannel returns a channel of events that are consumed by the Custom Watcher.
EventChannel() chan controller.Event
}
func (r *xdsReconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error {

View File

@ -15,6 +15,7 @@ import (
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh/internal/controllers/xds/status"
"github.com/hashicorp/consul/internal/mesh/internal/types"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/mappers/bimapper"
"github.com/hashicorp/consul/internal/resource/resourcetest"
@ -432,6 +433,37 @@ func (suite *xdsControllerTestSuite) TestController_ComputeAddUpdateEndpoints()
}
// Sets up a full controller, and tests that reconciles are getting triggered for the events it should.
func (suite *xdsControllerTestSuite) TestController_ComputeEndpointForProxyConnections() {
// Run the controller manager.
mgr := controller.NewManager(suite.client, suite.runtime.Logger)
mgr.Register(Controller(suite.mapper, suite.updater, suite.fetcher))
mgr.SetRaftLeader(true)
go mgr.Run(suite.ctx)
// Set up fooEndpoints and fooProxyStateTemplate with a reference to fooEndpoints. These need to be stored
// because the controller reconcile looks them up.
suite.setupFooProxyStateTemplateAndEndpoints()
// Assert that the expected ProxyState matches the actual ProxyState that PushChange was called with. This needs to
// be in a retry block unlike the Reconcile tests because the controller triggers asynchronously.
retry.Run(suite.T(), func(r *retry.R) {
actualEndpoints := suite.updater.GetEndpoints(suite.fooProxyStateTemplate.Id.Name)
// Assert on the status.
suite.client.RequireStatusCondition(r, suite.fooProxyStateTemplate.Id, ControllerName, status.ConditionAccepted())
// Assert that the endpoints computed in the controller matches the expected endpoints.
prototest.AssertDeepEqual(r, suite.expectedFooProxyStateEndpoints, actualEndpoints)
})
eventChannel := suite.updater.EventChannel()
eventChannel <- controller.Event{Obj: &proxytracker.ProxyConnection{ProxyID: suite.fooProxyStateTemplate.Id}}
// Wait for the proxy state template to be re-evaluated.
proxyStateTemp := suite.client.WaitForNewVersion(suite.T(), suite.fooProxyStateTemplate.Id, suite.fooProxyStateTemplate.Version)
require.NotNil(suite.T(), proxyStateTemp)
}
// Setup: fooProxyStateTemplate with an EndpointsRef to fooEndpoints
// Saves all related resources to the suite so they can be modified if needed.
func (suite *xdsControllerTestSuite) setupFooProxyStateTemplateAndEndpoints() {

View File

@ -5,12 +5,12 @@ package xds
import (
"fmt"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
"sync"
"github.com/hashicorp/consul/internal/controller"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
"github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate"
"github.com/hashicorp/consul/proto-public/pbresource"
)
@ -23,6 +23,7 @@ type mockUpdater struct {
latestPs map[string]proxysnapshot.ProxySnapshot
notConnected bool
pushChangeError bool
eventChan chan controller.Event
}
func newMockUpdater() *mockUpdater {
@ -63,6 +64,13 @@ func (m *mockUpdater) ProxyConnectedToServer(_ *pbresource.ID) bool {
return true
}
func (m *mockUpdater) EventChannel() chan controller.Event {
if m.eventChan == nil {
m.eventChan = make(chan controller.Event)
}
return m.eventChan
}
func (p *mockUpdater) Get(name string) *proxytracker.ProxyState {
p.lock.Lock()
defer p.lock.Unlock()

View File

@ -0,0 +1,21 @@
package xds
import (
"context"
"fmt"
"github.com/hashicorp/consul/internal/controller"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
)
func proxySource(updater ProxyUpdater) *controller.Source {
return &controller.Source{Source: updater.EventChannel()}
}
func proxyMapper(ctx context.Context, rt controller.Runtime, event controller.Event) ([]controller.Request, error) {
connection, ok := event.Obj.(*proxytracker.ProxyConnection)
if !ok {
return nil, fmt.Errorf("expected event to be of type *proxytracker.ProxyConnection but was %+v", event)
}
return []controller.Request{{ID: connection.ProxyID}}, nil
}

View File

@ -1,9 +1,9 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
// Code generated by mockery v2.33.1. DO NOT EDIT.
package proxytracker
import (
"github.com/hashicorp/consul/agent/grpc-external/limiter"
limiter "github.com/hashicorp/consul/agent/grpc-external/limiter"
mock "github.com/stretchr/testify/mock"
)

View File

@ -6,26 +6,26 @@ package proxytracker
import (
"errors"
"fmt"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/internal/mesh/internal/types"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"github.com/hashicorp/go-hclog"
"sync"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh/internal/types"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/proto-public/pbresource"
)
// Proxy implements the queue.ItemType interface so that it can be used in a controller.Event.
// ProxyConnection implements the queue.ItemType interface so that it can be used in a controller.Event.
// It is sent on the newProxyConnectionCh channel.
// TODO(ProxyState): needs to support tenancy in the future.
// Key() is current resourceID.Name.
type ProxyConnection struct {
ProxyID *pbresource.ID
}
// Key is current resourceID.Name.
func (e *ProxyConnection) Key() string {
return e.ProxyID.GetName()
}

View File

@ -6,6 +6,11 @@ package proxytracker
import (
"errors"
"fmt"
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh/internal/types"
@ -14,9 +19,6 @@ import (
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"testing"
)
func TestProxyTracker_Watch(t *testing.T) {