consul/internal/controller/mem_consistency_test.go
Matt Keeler 326c0ecfbe
In-Memory gRPC (#19942)
* Implement In-Process gRPC for use by controller caching/indexing

This replaces the pipe base listener implementation we were previously using. The new style CAN avoid cloning resources which our controller caching/indexing is taking advantage of to not duplicate resource objects in memory.

To maintain safety for controllers and for them to be able to modify data they get back from the cache and the resource service, the client they are presented in their runtime will be wrapped with an autogenerated client which clones request and response messages as they pass through the client.

Another sizable change in this PR is to consolidate how server specific gRPC services get registered and managed. Before this was in a bunch of different methods and it was difficult to track down how gRPC services were registered. Now its all in one place.

* Fix race in tests

* Ensure the resource service is registered to the multiplexed handler for forwarding from client agents

* Expose peer streaming on the internal handler
2024-01-12 11:54:07 -05:00

344 lines
11 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package controller
import (
"context"
"sync"
"testing"
"time"
mockpbresource "github.com/hashicorp/consul/grpcmocks/proto-public/pbresource"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/resourcetest"
"github.com/hashicorp/consul/internal/storage"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/proto/private/prototest"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)
var (
fakeType = &pbresource.Type{
Group: "testing",
GroupVersion: "v1",
Kind: "Fake",
}
fakeV2Type = &pbresource.Type{
Group: "testing",
GroupVersion: "v2",
Kind: "Fake",
}
)
type memCheckResult struct {
clientGet *pbresource.Resource
clientGetError error
cacheGet *pbresource.Resource
cacheGetError error
}
type memCheckReconciler struct {
mu sync.Mutex
closed bool
reconcileCh chan memCheckResult
mapCh chan memCheckResult
}
func newMemCheckReconciler(t testutil.TestingTB) *memCheckReconciler {
t.Helper()
r := &memCheckReconciler{
reconcileCh: make(chan memCheckResult, 10),
mapCh: make(chan memCheckResult, 10),
}
t.Cleanup(r.Shutdown)
return r
}
func (r *memCheckReconciler) Shutdown() {
r.mu.Lock()
defer r.mu.Unlock()
r.closed = true
close(r.reconcileCh)
close(r.mapCh)
}
func (r *memCheckReconciler) requireNotClosed(t testutil.TestingTB) {
t.Helper()
if r.closed {
require.FailNow(t, "the memCheckReconciler has been closed")
}
}
func (r *memCheckReconciler) checkReconcileResult(t testutil.TestingTB, ctx context.Context, res *pbresource.Resource) {
t.Helper()
r.requireEqualNotSameMemCheckResult(t, ctx, r.reconcileCh, res)
}
func (r *memCheckReconciler) checkMapResult(t testutil.TestingTB, ctx context.Context, res *pbresource.Resource) {
t.Helper()
r.requireEqualNotSameMemCheckResult(t, ctx, r.mapCh, res)
}
func (r *memCheckReconciler) requireEqualNotSameMemCheckResult(t testutil.TestingTB, ctx context.Context, ch <-chan memCheckResult, res *pbresource.Resource) {
t.Helper()
select {
case result := <-ch:
require.NoError(t, result.clientGetError)
require.NoError(t, result.cacheGetError)
// Equal but NotSame means the values are all the same but
// the pointers are different. Note that this probably doesn't
// check that the values within the resource haven't been shallow
// copied but that probably should be checked elsewhere
prototest.AssertDeepEqual(t, res, result.clientGet)
require.NotSame(t, res, result.clientGet)
prototest.AssertDeepEqual(t, res, result.cacheGet)
require.NotSame(t, res, result.cacheGet)
case <-ctx.Done():
require.Fail(t, "didn't receive mem check result before context cancellation", ctx.Err())
}
}
func (r *memCheckReconciler) Reconcile(ctx context.Context, rt Runtime, req Request) error {
r.mu.Lock()
defer r.mu.Unlock()
if !r.closed {
r.getAndSend(ctx, rt, req.ID, r.reconcileCh)
}
return nil
}
func (r *memCheckReconciler) MapToNothing(
ctx context.Context,
rt Runtime,
res *pbresource.Resource,
) ([]Request, error) {
r.mu.Lock()
defer r.mu.Unlock()
if !r.closed {
r.getAndSend(ctx, rt, res.Id, r.mapCh)
}
return nil, nil
}
func (*memCheckReconciler) getAndSend(ctx context.Context, rt Runtime, id *pbresource.ID, ch chan<- memCheckResult) {
var res memCheckResult
response, err := rt.Client.Read(ctx, &pbresource.ReadRequest{
Id: id,
})
res.clientGetError = err
if response != nil {
res.clientGet = response.Resource
}
res.cacheGet, res.cacheGetError = rt.Cache.Get(id.Type, "id", id)
ch <- res
}
func watchListEvents(t testutil.TestingTB, events ...*pbresource.WatchEvent) pbresource.ResourceService_WatchListClient {
t.Helper()
ctx := testutil.TestContext(t)
watchListClient := mockpbresource.NewResourceService_WatchListClient(t)
// Return the events in the specified order as soon as they are requested
for _, event := range events {
watchListClient.EXPECT().
Recv().
RunAndReturn(func() (*pbresource.WatchEvent, error) {
return event, nil
}).
Once()
}
// Now that all specified events have been exhausted we loop until the test finishes
// and the context bound to the tests lifecycle has been cancelled. This prevents getting
// any weird errors from the controller manager/runner.
watchListClient.EXPECT().
Recv().
RunAndReturn(func() (*pbresource.WatchEvent, error) {
<-ctx.Done()
return nil, ctx.Err()
}).
Maybe()
return watchListClient
}
// TestControllerRuntimeMemoryCloning mainly is testing that the runtimes
// provided to reconcilers and dependency mappers will return data from
// the resource service client and the cache that have been cloned so that
// the controller should be free to modify the data as needed.
func TestControllerRuntimeMemoryCloning(t *testing.T) {
ctx := testutil.TestContext(t)
// create some resources to use during the test
res1 := resourcetest.Resource(fakeType, "foo").
WithTenancy(resource.DefaultNamespacedTenancy()).
Build()
res2 := resourcetest.Resource(fakeV2Type, "bar").
WithTenancy(resource.DefaultNamespacedTenancy()).
Build()
// create the reconciler that will read the desired resource
// from both the resource service client and the cache client.
reconciler := newMemCheckReconciler(t)
// Create the v1 watch list client to be returned when the controller runner
// calls WatchList on the v1 testing type.
v1WatchListClient := watchListEvents(t, &pbresource.WatchEvent{
Operation: pbresource.WatchEvent_OPERATION_UPSERT,
Resource: res1,
})
// Create the v2 watch list client to be returned when the controller runner
// calls WatchList on the v2 testing type.
v2WatchListClient := watchListEvents(t, nil, &pbresource.WatchEvent{
Operation: pbresource.WatchEvent_OPERATION_UPSERT,
Resource: res2,
})
// Create the mock resource service client
mres := mockpbresource.NewResourceServiceClient(t)
// Setup the expectation for the controller runner to issue a WatchList
// request for the managed type (fake v2 type)
mres.EXPECT().
WatchList(mock.Anything, &pbresource.WatchListRequest{
Type: fakeV2Type,
Tenancy: &pbresource.Tenancy{
Partition: storage.Wildcard,
PeerName: storage.Wildcard,
Namespace: storage.Wildcard,
},
}).
Return(v2WatchListClient, nil).
Once()
// Setup the expectation for the controller runner to issue a WatchList
// request for the secondary Watch type (fake v1 type)
mres.EXPECT().
WatchList(mock.Anything, &pbresource.WatchListRequest{
Type: fakeType,
Tenancy: &pbresource.Tenancy{
Partition: storage.Wildcard,
PeerName: storage.Wildcard,
Namespace: storage.Wildcard,
},
}).
Return(v1WatchListClient, nil).
Once()
// The cloning resource clients will forward actual calls onto the main resource service client.
// Here we are configuring the service mock to return either of the resources depending on the
// id present in the request.
mres.EXPECT().
Read(mock.Anything, mock.Anything).
RunAndReturn(func(_ context.Context, req *pbresource.ReadRequest, opts ...grpc.CallOption) (*pbresource.ReadResponse, error) {
res := res2
if resource.EqualID(res1.Id, req.Id) {
res = res1
}
return &pbresource.ReadResponse{Resource: res}, nil
}).
Times(0)
// create the test controller
ctl := NewController("test", fakeV2Type).
WithWatch(fakeType, reconciler.MapToNothing).
WithReconciler(reconciler)
// create the controller manager and register our test controller
manager := NewManager(mres, testutil.Logger(t))
manager.Register(ctl)
// run the controller manager
manager.SetRaftLeader(true)
go manager.Run(ctx)
// All future assertions should easily be able to run within 5s although they
// should typically run a couple orders of magnitude faster.
timeLimitedCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
t.Cleanup(cancel)
// validate that the v2 resource type event was seen and that the
// cache and the resource service client return cloned resources
reconciler.checkReconcileResult(t, timeLimitedCtx, res2)
// Validate that the dependency mapper's resource and cache clients return
// cloned resources.
reconciler.checkMapResult(t, timeLimitedCtx, res1)
}
// TestRunnerSharedMemoryCache is mainly testing to ensure that resources
// within the cache are shared with the resource service and have not been
// cloned.
func TestControllerRunnerSharedMemoryCache(t *testing.T) {
ctx := testutil.TestContext(t)
// create resource to use during the test
res := resourcetest.Resource(fakeV2Type, "bar").
WithTenancy(resource.DefaultNamespacedTenancy()).
Build()
// create the reconciler that will read the desired resource
// from both the resource service client and the cache client.
reconciler := newMemCheckReconciler(t)
// Create the v2 watch list client to be returned when the controller runner
// calls WatchList on the v2 testing type.
v2WatchListClient := watchListEvents(t, nil, &pbresource.WatchEvent{
Operation: pbresource.WatchEvent_OPERATION_UPSERT,
Resource: res,
})
// Create the mock resource service client
mres := mockpbresource.NewResourceServiceClient(t)
// Setup the expectation for the controller runner to issue a WatchList
// request for the managed type (fake v2 type)
mres.EXPECT().
WatchList(mock.Anything, &pbresource.WatchListRequest{
Type: fakeV2Type,
Tenancy: &pbresource.Tenancy{
Partition: storage.Wildcard,
PeerName: storage.Wildcard,
Namespace: storage.Wildcard,
},
}).
Return(v2WatchListClient, nil).
Once()
// The cloning resource clients will forward actual calls onto the main resource service client.
// Here we are configuring the service mock to return our singular resource always.
mres.EXPECT().
Read(mock.Anything, mock.Anything).
Return(&pbresource.ReadResponse{Resource: res}, nil).
Times(0)
// create the test controller
ctl := NewController("test", fakeV2Type).
WithReconciler(reconciler)
runner := newControllerRunner(ctl, mres, testutil.Logger(t))
go runner.run(ctx)
// Wait for reconcile to be called before we check the values in
// the cache. This will also validate that the resource service client
// and cache client given to the reconciler cloned the resource but
// that is tested more thoroughly in another test and isn't of primary
// concern here.
reconciler.checkReconcileResult(t, ctx, res)
// Now validate that the cache hold the same resource pointer as the original data
actual, err := runner.cache.Get(fakeV2Type, "id", res.Id)
require.NoError(t, err)
require.Same(t, res, actual)
}