mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 13:55:55 +00:00
c029b20615
The new controller caches are initialized before the DependencyMappers or the Reconciler run, but importantly they are not populated. The expectation is that when the WatchList call is made to the resource service it will send an initial snapshot of all resources matching a single type, and then perpetually send UPSERT/DELETE events afterward. This initial snapshot will cycle through the caching layer and will catch it up to reflect the stored data. Critically the dependency mappers and reconcilers will race against the restoration of the caches on server startup or leader election. During this time it is possible a mapper or reconciler will use the cache to lookup a specific relationship and not find it. That very same reconciler may choose to then recompute some persisted resource and in effect rewind it to a prior computed state. Change - Since we are updating the behavior of the WatchList RPC, it was aligned to match that of pbsubscribe and pbpeerstream using a protobuf oneof instead of the enum+fields option. - The WatchList rpc now has 3 alternating response events: Upsert, Delete, EndOfSnapshot. When set the initial batch of "snapshot" Upserts sent on a new watch, those operations will be followed by an EndOfSnapshot event before beginning the never-ending sequence of Upsert/Delete events. - Within the Controller startup code we will launch N+1 goroutines to execute WatchList queries for the watched types. The UPSERTs will be applied to the nascent cache only (no mappers will execute). - Upon witnessing the END operation, those goroutines will terminate. - When all cache priming routines complete, then the normal set of N+1 long lived watch routines will launch to officially witness all events in the system using the primed cached.
375 lines
12 KiB
Go
375 lines
12 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package controller
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/mock"
|
|
"github.com/stretchr/testify/require"
|
|
"google.golang.org/grpc"
|
|
|
|
mockpbresource "github.com/hashicorp/consul/grpcmocks/proto-public/pbresource"
|
|
"github.com/hashicorp/consul/internal/resource"
|
|
"github.com/hashicorp/consul/internal/resource/resourcetest"
|
|
"github.com/hashicorp/consul/internal/storage"
|
|
"github.com/hashicorp/consul/proto-public/pbresource"
|
|
"github.com/hashicorp/consul/proto/private/prototest"
|
|
"github.com/hashicorp/consul/sdk/testutil"
|
|
)
|
|
|
|
var (
|
|
fakeType = &pbresource.Type{
|
|
Group: "testing",
|
|
GroupVersion: "v1",
|
|
Kind: "Fake",
|
|
}
|
|
|
|
fakeV2Type = &pbresource.Type{
|
|
Group: "testing",
|
|
GroupVersion: "v2",
|
|
Kind: "Fake",
|
|
}
|
|
)
|
|
|
|
type memCheckResult struct {
|
|
clientGet *pbresource.Resource
|
|
clientGetError error
|
|
cacheGet *pbresource.Resource
|
|
cacheGetError error
|
|
}
|
|
|
|
type memCheckReconciler struct {
|
|
mu sync.Mutex
|
|
closed bool
|
|
reconcileCh chan memCheckResult
|
|
mapCh chan memCheckResult
|
|
}
|
|
|
|
func newMemCheckReconciler(t testutil.TestingTB) *memCheckReconciler {
|
|
t.Helper()
|
|
|
|
r := &memCheckReconciler{
|
|
reconcileCh: make(chan memCheckResult, 10),
|
|
mapCh: make(chan memCheckResult, 10),
|
|
}
|
|
|
|
t.Cleanup(r.Shutdown)
|
|
return r
|
|
}
|
|
|
|
func (r *memCheckReconciler) Shutdown() {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
r.closed = true
|
|
close(r.reconcileCh)
|
|
close(r.mapCh)
|
|
}
|
|
|
|
func (r *memCheckReconciler) requireNotClosed(t testutil.TestingTB) {
|
|
t.Helper()
|
|
if r.closed {
|
|
require.FailNow(t, "the memCheckReconciler has been closed")
|
|
}
|
|
}
|
|
|
|
func (r *memCheckReconciler) checkReconcileResult(t testutil.TestingTB, ctx context.Context, res *pbresource.Resource) {
|
|
t.Helper()
|
|
r.requireEqualNotSameMemCheckResult(t, ctx, r.reconcileCh, res)
|
|
}
|
|
|
|
func (r *memCheckReconciler) checkMapResult(t testutil.TestingTB, ctx context.Context, res *pbresource.Resource) {
|
|
t.Helper()
|
|
r.requireEqualNotSameMemCheckResult(t, ctx, r.mapCh, res)
|
|
}
|
|
|
|
func (r *memCheckReconciler) requireEqualNotSameMemCheckResult(t testutil.TestingTB, ctx context.Context, ch <-chan memCheckResult, res *pbresource.Resource) {
|
|
t.Helper()
|
|
|
|
select {
|
|
case result := <-ch:
|
|
require.NoError(t, result.clientGetError)
|
|
require.NoError(t, result.cacheGetError)
|
|
// Equal but NotSame means the values are all the same but
|
|
// the pointers are different. Note that this probably doesn't
|
|
// check that the values within the resource haven't been shallow
|
|
// copied but that probably should be checked elsewhere
|
|
prototest.AssertDeepEqual(t, res, result.clientGet)
|
|
require.NotSame(t, res, result.clientGet)
|
|
prototest.AssertDeepEqual(t, res, result.cacheGet)
|
|
require.NotSame(t, res, result.cacheGet)
|
|
case <-ctx.Done():
|
|
require.Fail(t, "didn't receive mem check result before context cancellation", ctx.Err())
|
|
}
|
|
}
|
|
|
|
func (r *memCheckReconciler) Reconcile(ctx context.Context, rt Runtime, req Request) error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if !r.closed {
|
|
r.getAndSend(ctx, rt, req.ID, r.reconcileCh)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *memCheckReconciler) MapToNothing(
|
|
ctx context.Context,
|
|
rt Runtime,
|
|
res *pbresource.Resource,
|
|
) ([]Request, error) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if !r.closed {
|
|
r.getAndSend(ctx, rt, res.Id, r.mapCh)
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func (*memCheckReconciler) getAndSend(ctx context.Context, rt Runtime, id *pbresource.ID, ch chan<- memCheckResult) {
|
|
var res memCheckResult
|
|
response, err := rt.Client.Read(ctx, &pbresource.ReadRequest{
|
|
Id: id,
|
|
})
|
|
res.clientGetError = err
|
|
if response != nil {
|
|
res.clientGet = response.Resource
|
|
}
|
|
|
|
res.cacheGet, res.cacheGetError = rt.Cache.Get(id.Type, "id", id)
|
|
|
|
ch <- res
|
|
}
|
|
|
|
func watchListEvents(t testutil.TestingTB, events ...*pbresource.WatchEvent) pbresource.ResourceService_WatchListClient {
|
|
t.Helper()
|
|
ctx := testutil.TestContext(t)
|
|
|
|
watchListClient := mockpbresource.NewResourceService_WatchListClient(t)
|
|
|
|
// Return the events in the specified order as soon as they are requested
|
|
for _, event := range events {
|
|
evt := event
|
|
watchListClient.EXPECT().
|
|
Recv().
|
|
RunAndReturn(func() (*pbresource.WatchEvent, error) {
|
|
return evt, nil
|
|
}).
|
|
Once()
|
|
}
|
|
|
|
watchListClient.EXPECT().
|
|
Recv().
|
|
RunAndReturn(func() (*pbresource.WatchEvent, error) {
|
|
return &pbresource.WatchEvent{
|
|
Event: &pbresource.WatchEvent_EndOfSnapshot_{
|
|
EndOfSnapshot: &pbresource.WatchEvent_EndOfSnapshot{},
|
|
},
|
|
}, nil
|
|
}).
|
|
Once()
|
|
|
|
// Now that all specified events have been exhausted we loop until the test finishes
|
|
// and the context bound to the tests lifecycle has been cancelled. This prevents getting
|
|
// any weird errors from the controller manager/runner.
|
|
watchListClient.EXPECT().
|
|
Recv().
|
|
RunAndReturn(func() (*pbresource.WatchEvent, error) {
|
|
<-ctx.Done()
|
|
return nil, ctx.Err()
|
|
}).
|
|
Maybe()
|
|
|
|
return watchListClient
|
|
}
|
|
|
|
// TestControllerRuntimeMemoryCloning mainly is testing that the runtimes
|
|
// provided to reconcilers and dependency mappers will return data from
|
|
// the resource service client and the cache that have been cloned so that
|
|
// the controller should be free to modify the data as needed.
|
|
func TestControllerRuntimeMemoryCloning(t *testing.T) {
|
|
ctx := testutil.TestContext(t)
|
|
|
|
// create some resources to use during the test
|
|
res1 := resourcetest.Resource(fakeType, "foo").
|
|
WithTenancy(resource.DefaultNamespacedTenancy()).
|
|
Build()
|
|
res2 := resourcetest.Resource(fakeV2Type, "bar").
|
|
WithTenancy(resource.DefaultNamespacedTenancy()).
|
|
Build()
|
|
|
|
// create the reconciler that will read the desired resource
|
|
// from both the resource service client and the cache client.
|
|
reconciler := newMemCheckReconciler(t)
|
|
|
|
// Create the v1 watch list client to be returned when the controller runner
|
|
// calls WatchList on the v1 testing type.
|
|
v1WatchListClientCreate := func() pbresource.ResourceService_WatchListClient {
|
|
return watchListEvents(t, &pbresource.WatchEvent{
|
|
Event: &pbresource.WatchEvent_Upsert_{
|
|
Upsert: &pbresource.WatchEvent_Upsert{
|
|
Resource: res1,
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
// Create the v2 watch list client to be returned when the controller runner
|
|
// calls WatchList on the v2 testing type.
|
|
v2WatchListClientCreate := func() pbresource.ResourceService_WatchListClient {
|
|
return watchListEvents(t, nil, &pbresource.WatchEvent{
|
|
Event: &pbresource.WatchEvent_Upsert_{
|
|
Upsert: &pbresource.WatchEvent_Upsert{
|
|
Resource: res2,
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
// Create the mock resource service client
|
|
mres := mockpbresource.NewResourceServiceClient(t)
|
|
// Setup the expectation for the controller runner to issue a WatchList
|
|
// request for the managed type (fake v2 type)
|
|
mres.EXPECT().
|
|
WatchList(mock.Anything, &pbresource.WatchListRequest{
|
|
Type: fakeV2Type,
|
|
Tenancy: &pbresource.Tenancy{
|
|
Partition: storage.Wildcard,
|
|
Namespace: storage.Wildcard,
|
|
},
|
|
}).
|
|
RunAndReturn(func(_ context.Context, _ *pbresource.WatchListRequest, _ ...grpc.CallOption) (pbresource.ResourceService_WatchListClient, error) {
|
|
return v2WatchListClientCreate(), nil
|
|
}).
|
|
Twice() // once for cache prime, once for the rest
|
|
|
|
// Setup the expectation for the controller runner to issue a WatchList
|
|
// request for the secondary Watch type (fake v1 type)
|
|
mres.EXPECT().
|
|
WatchList(mock.Anything, &pbresource.WatchListRequest{
|
|
Type: fakeType,
|
|
Tenancy: &pbresource.Tenancy{
|
|
Partition: storage.Wildcard,
|
|
Namespace: storage.Wildcard,
|
|
},
|
|
}).
|
|
RunAndReturn(func(_ context.Context, _ *pbresource.WatchListRequest, _ ...grpc.CallOption) (pbresource.ResourceService_WatchListClient, error) {
|
|
return v1WatchListClientCreate(), nil
|
|
}).
|
|
Twice() // once for cache prime, once for the rest
|
|
|
|
// The cloning resource clients will forward actual calls onto the main resource service client.
|
|
// Here we are configuring the service mock to return either of the resources depending on the
|
|
// id present in the request.
|
|
mres.EXPECT().
|
|
Read(mock.Anything, mock.Anything).
|
|
RunAndReturn(func(_ context.Context, req *pbresource.ReadRequest, opts ...grpc.CallOption) (*pbresource.ReadResponse, error) {
|
|
res := res2
|
|
if resource.EqualID(res1.Id, req.Id) {
|
|
res = res1
|
|
}
|
|
return &pbresource.ReadResponse{Resource: res}, nil
|
|
}).
|
|
Times(0)
|
|
|
|
// create the test controller
|
|
ctl := NewController("test", fakeV2Type).
|
|
WithWatch(fakeType, reconciler.MapToNothing).
|
|
WithReconciler(reconciler)
|
|
|
|
// create the controller manager and register our test controller
|
|
manager := NewManager(mres, testutil.Logger(t))
|
|
manager.Register(ctl)
|
|
|
|
// run the controller manager
|
|
manager.SetRaftLeader(true)
|
|
go manager.Run(ctx)
|
|
|
|
// All future assertions should easily be able to run within 5s although they
|
|
// should typically run a couple orders of magnitude faster.
|
|
timeLimitedCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
t.Cleanup(cancel)
|
|
|
|
// validate that the v2 resource type event was seen and that the
|
|
// cache and the resource service client return cloned resources
|
|
reconciler.checkReconcileResult(t, timeLimitedCtx, res2)
|
|
|
|
// Validate that the dependency mapper's resource and cache clients return
|
|
// cloned resources.
|
|
reconciler.checkMapResult(t, timeLimitedCtx, res1)
|
|
}
|
|
|
|
// TestRunnerSharedMemoryCache is mainly testing to ensure that resources
|
|
// within the cache are shared with the resource service and have not been
|
|
// cloned.
|
|
func TestControllerRunnerSharedMemoryCache(t *testing.T) {
|
|
ctx := testutil.TestContext(t)
|
|
|
|
// create resource to use during the test
|
|
res := resourcetest.Resource(fakeV2Type, "bar").
|
|
WithTenancy(resource.DefaultNamespacedTenancy()).
|
|
Build()
|
|
|
|
// create the reconciler that will read the desired resource
|
|
// from both the resource service client and the cache client.
|
|
reconciler := newMemCheckReconciler(t)
|
|
|
|
// Create the v2 watch list client to be returned when the controller runner
|
|
// calls WatchList on the v2 testing type.
|
|
v2WatchListClientCreate := func() pbresource.ResourceService_WatchListClient {
|
|
return watchListEvents(t, nil, &pbresource.WatchEvent{
|
|
Event: &pbresource.WatchEvent_Upsert_{
|
|
Upsert: &pbresource.WatchEvent_Upsert{
|
|
Resource: res,
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
// Create the mock resource service client
|
|
mres := mockpbresource.NewResourceServiceClient(t)
|
|
// Setup the expectation for the controller runner to issue a WatchList
|
|
// request for the managed type (fake v2 type)
|
|
mres.EXPECT().
|
|
WatchList(mock.Anything, &pbresource.WatchListRequest{
|
|
Type: fakeV2Type,
|
|
Tenancy: &pbresource.Tenancy{
|
|
Partition: storage.Wildcard,
|
|
Namespace: storage.Wildcard,
|
|
},
|
|
}).
|
|
RunAndReturn(func(_ context.Context, _ *pbresource.WatchListRequest, _ ...grpc.CallOption) (pbresource.ResourceService_WatchListClient, error) {
|
|
return v2WatchListClientCreate(), nil
|
|
}).
|
|
Twice() // once for cache prime, once for the rest
|
|
|
|
// The cloning resource clients will forward actual calls onto the main resource service client.
|
|
// Here we are configuring the service mock to return our singular resource always.
|
|
mres.EXPECT().
|
|
Read(mock.Anything, mock.Anything).
|
|
Return(&pbresource.ReadResponse{Resource: res}, nil).
|
|
Times(0)
|
|
|
|
// create the test controller
|
|
ctl := NewController("test", fakeV2Type).
|
|
WithReconciler(reconciler)
|
|
|
|
runner := newControllerRunner(ctl, mres, testutil.Logger(t))
|
|
go runner.run(ctx)
|
|
|
|
// Wait for reconcile to be called before we check the values in
|
|
// the cache. This will also validate that the resource service client
|
|
// and cache client given to the reconciler cloned the resource but
|
|
// that is tested more thoroughly in another test and isn't of primary
|
|
// concern here.
|
|
reconciler.checkReconcileResult(t, ctx, res)
|
|
|
|
// Now validate that the cache hold the same resource pointer as the original data
|
|
actual, err := runner.cache.Get(fakeV2Type, "id", res.Id)
|
|
require.NoError(t, err)
|
|
require.Same(t, res, actual)
|
|
}
|