mirror of
synced 2025-02-27 12:50:40 +00:00
The new controller caches are initialized before the DependencyMappers or the Reconciler run, but importantly they are not populated. The expectation is that when the WatchList call is made to the resource service it will send an initial snapshot of all resources matching a single type, and then perpetually send UPSERT/DELETE events afterward. This initial snapshot will cycle through the caching layer and will catch it up to reflect the stored data. Critically the dependency mappers and reconcilers will race against the restoration of the caches on server startup or leader election. During this time it is possible a mapper or reconciler will use the cache to lookup a specific relationship and not find it. That very same reconciler may choose to then recompute some persisted resource and in effect rewind it to a prior computed state. Change - Since we are updating the behavior of the WatchList RPC, it was aligned to match that of pbsubscribe and pbpeerstream using a protobuf oneof instead of the enum+fields option. - The WatchList rpc now has 3 alternating response events: Upsert, Delete, EndOfSnapshot. When set the initial batch of "snapshot" Upserts sent on a new watch, those operations will be followed by an EndOfSnapshot event before beginning the never-ending sequence of Upsert/Delete events. - Within the Controller startup code we will launch N+1 goroutines to execute WatchList queries for the watched types. The UPSERTs will be applied to the nascent cache only (no mappers will execute). - Upon witnessing the END operation, those goroutines will terminate. - When all cache priming routines complete, then the normal set of N+1 long lived watch routines will launch to officially witness all events in the system using the primed cached.
458 lines
13 KiB
458 lines
13 KiB
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package resource_test
import (
svc "github.com/hashicorp/consul/agent/grpc-external/services/resource"
svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing"
// TODO: Update all tests to use true/false table test for v2tenancy
func TestWatchList_InputValidation(t *testing.T) {
client := svctest.NewResourceServiceBuilder().
type testCase struct {
modFn func(*pbresource.WatchListRequest)
errContains string
testCases := map[string]testCase{
"no type": {
modFn: func(req *pbresource.WatchListRequest) { req.Type = nil },
errContains: "type is required",
"partition mixed case": {
modFn: func(req *pbresource.WatchListRequest) { req.Tenancy.Partition = "Default" },
errContains: "tenancy.partition invalid",
"partition too long": {
modFn: func(req *pbresource.WatchListRequest) {
req.Tenancy.Partition = strings.Repeat("p", resource.MaxNameLength+1)
errContains: "tenancy.partition invalid",
"namespace mixed case": {
modFn: func(req *pbresource.WatchListRequest) { req.Tenancy.Namespace = "Default" },
errContains: "tenancy.namespace invalid",
"namespace too long": {
modFn: func(req *pbresource.WatchListRequest) {
req.Tenancy.Namespace = strings.Repeat("n", resource.MaxNameLength+1)
errContains: "tenancy.namespace invalid",
"name_prefix mixed case": {
modFn: func(req *pbresource.WatchListRequest) { req.NamePrefix = "Smashing" },
errContains: "name_prefix invalid",
"partitioned type provides non-empty namespace": {
modFn: func(req *pbresource.WatchListRequest) {
req.Type = demo.TypeV1RecordLabel
req.Tenancy.Namespace = "bad"
errContains: "cannot have a namespace",
"cluster scope with non-empty partition": {
modFn: func(req *pbresource.WatchListRequest) {
req.Type = demo.TypeV1Executive
req.Tenancy = &pbresource.Tenancy{Partition: "bad"}
errContains: "cannot have a partition",
"cluster scope with non-empty namespace": {
modFn: func(req *pbresource.WatchListRequest) {
req.Type = demo.TypeV1Executive
req.Tenancy = &pbresource.Tenancy{Namespace: "bad"}
errContains: "cannot have a namespace",
for desc, tc := range testCases {
t.Run(desc, func(t *testing.T) {
req := &pbresource.WatchListRequest{
Type: demo.TypeV2Album,
Tenancy: resource.DefaultNamespacedTenancy(),
stream, err := client.WatchList(testContext(t), req)
require.NoError(t, err)
_, err = stream.Recv()
require.Error(t, err)
require.Equal(t, codes.InvalidArgument.String(), status.Code(err).String())
require.ErrorContains(t, err, tc.errContains)
func TestWatchList_TypeNotFound(t *testing.T) {
client := svctest.NewResourceServiceBuilder().Run(t)
stream, err := client.WatchList(context.Background(), &pbresource.WatchListRequest{
Type: demo.TypeV2Artist,
Tenancy: resource.DefaultNamespacedTenancy(),
NamePrefix: "",
require.NoError(t, err)
rspCh := handleResourceStream(t, stream)
err = mustGetError(t, rspCh)
require.Equal(t, codes.InvalidArgument.String(), status.Code(err).String())
require.Contains(t, err.Error(), "resource type demo.v2.Artist not registered")
func TestWatchList_GroupVersionMatches(t *testing.T) {
b := svctest.NewResourceServiceBuilder().
client := b.Run(t)
ctx := context.Background()
// create a watch
stream, err := client.WatchList(ctx, &pbresource.WatchListRequest{
Type: demo.TypeV2Artist,
Tenancy: resource.DefaultNamespacedTenancy(),
NamePrefix: "",
require.NoError(t, err)
rspCh := handleResourceStream(t, stream)
mustGetEndOfSnapshot(t, rspCh)
artist, err := demo.GenerateV2Artist()
require.NoError(t, err)
// insert and verify upsert event received
r1Resp, err := client.Write(ctx, &pbresource.WriteRequest{Resource: artist})
r1 := r1Resp.Resource
require.NoError(t, err)
rsp := mustGetResource(t, rspCh)
require.NotNil(t, rsp.GetUpsert())
prototest.AssertDeepEqual(t, r1, rsp.GetUpsert().Resource)
// update and verify upsert event received
r2 := modifyArtist(t, r1)
r2Resp, err := client.Write(ctx, &pbresource.WriteRequest{Resource: r2})
require.NoError(t, err)
r2 = r2Resp.Resource
rsp = mustGetResource(t, rspCh)
require.NotNil(t, rsp.GetUpsert())
prototest.AssertDeepEqual(t, r2, rsp.GetUpsert().Resource)
// delete and verify delete event received
_, err = client.Delete(ctx, &pbresource.DeleteRequest{Id: r2.Id, Version: r2.Version})
require.NoError(t, err)
rsp = mustGetResource(t, rspCh)
require.NotNil(t, rsp.GetDelete())
prototest.AssertDeepEqual(t, r2.Id, rsp.GetDelete().Resource.Id)
func TestWatchList_Tenancy_Defaults_And_Normalization(t *testing.T) {
// Test units of tenancy get lowercased and defaulted correctly when empty.
for desc, tc := range wildcardTenancyCases() {
t.Run(desc, func(t *testing.T) {
ctx := context.Background()
client := svctest.NewResourceServiceBuilder().
// Create a watch.
stream, err := client.WatchList(ctx, &pbresource.WatchListRequest{
Type: tc.typ,
Tenancy: tc.tenancy,
NamePrefix: "",
require.NoError(t, err)
rspCh := handleResourceStream(t, stream)
mustGetEndOfSnapshot(t, rspCh)
// Testcase will pick one of executive, recordLabel or artist based on scope of type.
recordLabel, err := demo.GenerateV1RecordLabel("looney-tunes")
require.NoError(t, err)
artist, err := demo.GenerateV2Artist()
require.NoError(t, err)
executive, err := demo.GenerateV1Executive("king-arthur", "CEO")
require.NoError(t, err)
// Create and verify upsert event received.
rlRsp, err := client.Write(ctx, &pbresource.WriteRequest{Resource: recordLabel})
require.NoError(t, err)
artistRsp, err := client.Write(ctx, &pbresource.WriteRequest{Resource: artist})
require.NoError(t, err)
executiveRsp, err := client.Write(ctx, &pbresource.WriteRequest{Resource: executive})
require.NoError(t, err)
var expected *pbresource.Resource
switch {
case resource.EqualType(tc.typ, demo.TypeV1RecordLabel):
expected = rlRsp.Resource
case resource.EqualType(tc.typ, demo.TypeV2Artist):
expected = artistRsp.Resource
case resource.EqualType(tc.typ, demo.TypeV1Executive):
expected = executiveRsp.Resource
require.Fail(t, "unsupported type", tc.typ)
rsp := mustGetResource(t, rspCh)
require.NotNil(t, rsp.GetUpsert())
prototest.AssertDeepEqual(t, expected, rsp.GetUpsert().Resource)
func TestWatchList_GroupVersionMismatch(t *testing.T) {
// Given a watch on TypeArtistV1 that only differs from TypeArtistV2 by GroupVersion
// When a resource of TypeArtistV2 is created/updated/deleted
// Then no watch events should be emitted
b := svctest.NewResourceServiceBuilder().
client := b.Run(t)
ctx := context.Background()
// create a watch for TypeArtistV1
stream, err := client.WatchList(ctx, &pbresource.WatchListRequest{
Type: demo.TypeV1Artist,
Tenancy: resource.DefaultNamespacedTenancy(),
NamePrefix: "",
require.NoError(t, err)
rspCh := handleResourceStream(t, stream)
mustGetEndOfSnapshot(t, rspCh)
artist, err := demo.GenerateV2Artist()
require.NoError(t, err)
// insert
r1Resp, err := client.Write(ctx, &pbresource.WriteRequest{Resource: artist})
require.NoError(t, err)
r1 := r1Resp.Resource
// update
r2 := clone(r1)
r2Resp, err := client.Write(ctx, &pbresource.WriteRequest{Resource: r2})
require.NoError(t, err)
r2 = r2Resp.Resource
// delete
_, err = client.Delete(ctx, &pbresource.DeleteRequest{Id: r2.Id, Version: r2.Version})
require.NoError(t, err)
// verify no events received
mustGetNoResource(t, rspCh)
// N.B. Uses key ACLs for now. See demo.RegisterTypes()
func TestWatchList_ACL_ListDenied(t *testing.T) {
// deny all
rspCh, _ := roundTripACL(t, testutils.ACLNoPermissions(t), true)
// verify key:list denied
err := mustGetError(t, rspCh)
require.Error(t, err)
require.Equal(t, codes.PermissionDenied.String(), status.Code(err).String())
require.Contains(t, err.Error(), "lacks permission 'key:list'")
// N.B. Uses key ACLs for now. See demo.RegisterTypes()
func TestWatchList_ACL_ListAllowed_ReadDenied(t *testing.T) {
// allow list, deny read
authz := AuthorizerFrom(t, `
key_prefix "resource/" { policy = "list" }
key_prefix "resource/demo.v2.Artist/" { policy = "deny" }
rspCh, _ := roundTripACL(t, authz, false)
// verify resource filtered out by key:read denied, hence no events
mustGetNoResource(t, rspCh)
// N.B. Uses key ACLs for now. See demo.RegisterTypes()
func TestWatchList_ACL_ListAllowed_ReadAllowed(t *testing.T) {
// allow list, allow read
authz := AuthorizerFrom(t, `
key_prefix "resource/" { policy = "list" }
key_prefix "resource/demo.v2.Artist/" { policy = "read" }
rspCh, artist := roundTripACL(t, authz, false)
// verify resource not filtered out by acl
event := mustGetResource(t, rspCh)
require.NotNil(t, event.GetUpsert())
prototest.AssertDeepEqual(t, artist, event.GetUpsert().Resource)
// roundtrip a WatchList which attempts to stream back a single write event
func roundTripACL(t *testing.T, authz acl.Authorizer, expectErr bool) (<-chan resourceOrError, *pbresource.Resource) {
mockACLResolver := &svc.MockACLResolver{}
mockACLResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything).
Return(resolver.Result{Authorizer: authz}, nil)
b := svctest.NewResourceServiceBuilder().
client := b.Run(t)
server := b.ServiceImpl()
artist, err := demo.GenerateV2Artist()
require.NoError(t, err)
stream, err := client.WatchList(testContext(t), &pbresource.WatchListRequest{
Type: artist.Id.Type,
Tenancy: artist.Id.Tenancy,
NamePrefix: "",
require.NoError(t, err)
rspCh := handleResourceStream(t, stream)
if !expectErr {
mustGetEndOfSnapshot(t, rspCh)
// induce single watch event
artist, err = server.Backend.WriteCAS(context.Background(), artist)
require.NoError(t, err)
// caller to make assertions on the rspCh and written artist
return rspCh, artist
func mustGetNoResource(t *testing.T, ch <-chan resourceOrError) {
select {
case rsp := <-ch:
require.NoError(t, rsp.err)
require.Nil(t, rsp.rsp, "expected nil response with no error")
case <-time.After(250 * time.Millisecond):
func mustGetEndOfSnapshot(t *testing.T, ch <-chan resourceOrError) {
event := mustGetResource(t, ch)
require.NotNil(t, event.GetEndOfSnapshot(), "expected EndOfSnapshot but got got event %T", event.GetEvent())
func mustGetResource(t *testing.T, ch <-chan resourceOrError) *pbresource.WatchEvent {
select {
case rsp := <-ch:
require.NoError(t, rsp.err)
return rsp.rsp
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for WatchListResponse")
return nil
func mustGetError(t *testing.T, ch <-chan resourceOrError) error {
select {
case rsp := <-ch:
require.Error(t, rsp.err)
return rsp.err
case <-time.After(2 * time.Second):
t.Fatal("timeout waiting for WatchListResponse")
return nil
func handleResourceStream(t *testing.T, stream pbresource.ResourceService_WatchListClient) <-chan resourceOrError {
rspCh := make(chan resourceOrError)
go func() {
for {
rsp, err := stream.Recv()
if errors.Is(err, io.EOF) ||
errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) {
rspCh <- resourceOrError{
rsp: rsp,
err: err,
return rspCh
type resourceOrError struct {
rsp *pbresource.WatchEvent
err error
func TestWatchList_NoTenancy(t *testing.T) {
ctx := context.Background()
client := svctest.NewResourceServiceBuilder().
// Create a watch.
stream, err := client.WatchList(ctx, &pbresource.WatchListRequest{
Type: demo.TypeV1RecordLabel,
require.NoError(t, err)
rspCh := handleResourceStream(t, stream)
mustGetEndOfSnapshot(t, rspCh)
recordLabel, err := demo.GenerateV1RecordLabel("looney-tunes")
require.NoError(t, err)
// Create and verify upsert event received.
rsp1, err := client.Write(ctx, &pbresource.WriteRequest{Resource: recordLabel})
require.NoError(t, err)
rsp2 := mustGetResource(t, rspCh)
require.NotNil(t, rsp2.GetUpsert())
prototest.AssertDeepEqual(t, rsp1.Resource, rsp2.GetUpsert().Resource)