consul/internal/storage/inmem/snapshot_test.go
R.B. Boyer c029b20615
v2: ensure the controller caches are fully populated before first use (#20421)
The new controller caches are initialized before the DependencyMappers or the 
Reconciler run, but importantly they are not populated. The expectation is that 
when the WatchList call is made to the resource service it will send an initial 
snapshot of all resources matching a single type, and then perpetually send 
UPSERT/DELETE events afterward. This initial snapshot will cycle through the 
caching layer and will catch it up to reflect the stored data.

Critically the dependency mappers and reconcilers will race against the restoration 
of the caches on server startup or leader election. During this time it is possible a
 mapper or reconciler will use the cache to lookup a specific relationship and 
not find it. That very same reconciler may choose to then recompute some 
persisted resource and in effect rewind it to a prior computed state.

Change

- Since we are updating the behavior of the WatchList RPC, it was aligned to 
  match that of pbsubscribe and pbpeerstream using a protobuf oneof instead of the enum+fields option.

- The WatchList rpc now has 3 alternating response events: Upsert, Delete, 
  EndOfSnapshot. When set the initial batch of "snapshot" Upserts sent on a new 
  watch, those operations will be followed by an EndOfSnapshot event before beginning 
  the never-ending sequence of Upsert/Delete events.

- Within the Controller startup code we will launch N+1 goroutines to execute WatchList 
  queries for the watched types. The UPSERTs will be applied to the nascent cache
   only (no mappers will execute).

- Upon witnessing the END operation, those goroutines will terminate.

- When all cache priming routines complete, then the normal set of N+1 long lived 
watch routines will launch to officially witness all events in the system using the 
primed cached.
2024-02-02 15:11:05 -06:00

104 lines
2.5 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package inmem_test
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/internal/storage"
"github.com/hashicorp/consul/internal/storage/inmem"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/proto/private/prototest"
)
func TestSnapshotRestore(t *testing.T) {
oldStore, err := inmem.NewStore()
require.NoError(t, err)
a := &pbresource.Resource{
Id: &pbresource.ID{
Type: &pbresource.Type{
Group: "mesh",
GroupVersion: "v1",
Kind: "service",
},
Tenancy: &pbresource.Tenancy{
Partition: "default",
Namespace: "default",
},
Name: "billing",
Uid: "a",
},
Version: "1",
}
require.NoError(t, oldStore.WriteCAS(a, ""))
newStore, err := inmem.NewStore()
require.NoError(t, err)
// Write something to the new store to make sure it gets blown away.
b := &pbresource.Resource{
Id: &pbresource.ID{
Type: &pbresource.Type{
Group: "mesh",
GroupVersion: "v1",
Kind: "service",
},
Tenancy: &pbresource.Tenancy{
Partition: "default",
Namespace: "default",
},
Name: "api",
Uid: "a",
},
Version: "1",
}
require.NoError(t, newStore.WriteCAS(b, ""))
snap, err := oldStore.Snapshot()
require.NoError(t, err)
// Start a watch on the new store to make sure it gets closed.
watch, err := newStore.WatchList(storage.UnversionedTypeFrom(b.Id.Type), b.Id.Tenancy, "")
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
// Expect the initial state on the watch.
got, err := watch.Next(ctx)
require.NoError(t, err)
require.NotNil(t, got.GetUpsert())
// expect to get snapshot end op
got, err = watch.Next(ctx)
require.NoError(t, err)
require.NotNil(t, got.GetEndOfSnapshot())
restore, err := newStore.Restore()
require.NoError(t, err)
defer restore.Abort()
for r := snap.Next(); r != nil; r = snap.Next() {
restore.Apply(r)
}
restore.Commit()
// Check that resource we wrote to oldStore has been restored to newStore.
rsp, err := newStore.Read(a.Id)
require.NoError(t, err)
prototest.AssertDeepEqual(t, a, rsp)
// Check that resource written to newStore was removed by snapshot restore.
_, err = newStore.Read(b.Id)
require.ErrorIs(t, err, storage.ErrNotFound)
// Check the watch has been closed.
_, err = watch.Next(ctx)
require.ErrorIs(t, err, storage.ErrWatchClosed)
}