From 81df781e5f62270ff4e8fad6d8a82faaf9b2287b Mon Sep 17 00:00:00 2001 From: Dan Upton Date: Mon, 27 Mar 2023 10:30:53 +0100 Subject: [PATCH] Add storage backend interface and in-memory implementation (#16538) Introduces `storage.Backend`, which will serve as the interface between the Resource Service and the underlying storage system (Raft today, but in the future, who knows!). The primary design goal of this interface is to keep its surface area small, and push as much functionality as possible into the layers above, so that new implementations can be added with little effort, and easily proven to be correct. To that end, we also provide a suite of "conformance" tests that can be run against a backend implementation to check it behaves correctly. In this commit, we introduce an initial in-memory storage backend, which is suitable for tests and when running Consul in development mode. This backend is a thin wrapper around the `Store` type, which implements a resource database using go-memdb and our internal pub/sub system. `Store` will also be used to handle reads in our Raft backend, and in the future, used as a local cache for external storage systems. --- internal/storage/conformance/conformance.go | 675 ++++++++++++++++++++ internal/storage/inmem/backend.go | 73 +++ internal/storage/inmem/backend_test.go | 28 + internal/storage/inmem/event_index.go | 33 + internal/storage/inmem/schema.go | 247 +++++++ internal/storage/inmem/store.go | 274 ++++++++ internal/storage/inmem/watch.go | 189 ++++++ internal/storage/storage.go | 307 +++++++++ proto/private/prototest/testing.go | 25 +- 9 files changed, 1847 insertions(+), 4 deletions(-) create mode 100644 internal/storage/conformance/conformance.go create mode 100644 internal/storage/inmem/backend.go create mode 100644 internal/storage/inmem/backend_test.go create mode 100644 internal/storage/inmem/event_index.go create mode 100644 internal/storage/inmem/schema.go create mode 100644 internal/storage/inmem/store.go create mode 100644 internal/storage/inmem/watch.go create mode 100644 internal/storage/storage.go diff --git a/internal/storage/conformance/conformance.go b/internal/storage/conformance/conformance.go new file mode 100644 index 0000000000..8c0e114bd0 --- /dev/null +++ b/internal/storage/conformance/conformance.go @@ -0,0 +1,675 @@ +package conformance + +import ( + "context" + "errors" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/testing/protocmp" + + "github.com/hashicorp/consul/internal/storage" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/proto/private/prototest" + "github.com/hashicorp/consul/sdk/testutil/retry" +) + +type TestOptions struct { + // NewBackend will be called to construct a storage.Backend to run the tests + // against. + NewBackend func(t *testing.T) storage.Backend + + // SupportsStronglyConsistentList indicates whether the given storage backend + // supports strongly consistent list operations. + SupportsStronglyConsistentList bool +} + +// Test runs a suite of tests against a storage.Backend implementation to check +// it correctly implements our required behaviours. +func Test(t *testing.T, opts TestOptions) { + require.NotNil(t, opts.NewBackend, "NewBackend method is required") + + t.Run("Read", func(t *testing.T) { testRead(t, opts) }) + t.Run("CAS Write", func(t *testing.T) { testCASWrite(t, opts) }) + t.Run("CAS Delete", func(t *testing.T) { testCASDelete(t, opts) }) + t.Run("OwnerReferences", func(t *testing.T) { testOwnerReferences(t, opts) }) + + testListWatch(t, opts) +} + +func testRead(t *testing.T, opts TestOptions) { + ctx := testContext(t) + + for consistency, check := range map[storage.ReadConsistency]consistencyChecker{ + storage.EventualConsistency: eventually, + storage.StrongConsistency: immediately, + } { + t.Run(consistency.String(), func(t *testing.T) { + res := &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeAv1, + Tenancy: tenancyDefault, + Name: "web", + Uid: "a", + }, + } + + t.Run("simple", func(t *testing.T) { + backend := opts.NewBackend(t) + + _, err := backend.WriteCAS(ctx, res) + require.NoError(t, err) + + check(t, func(t testingT) { + output, err := backend.Read(ctx, consistency, res.Id) + require.NoError(t, err) + prototest.AssertDeepEqual(t, res, output, ignoreVersion) + }) + }) + + t.Run("no uid", func(t *testing.T) { + backend := opts.NewBackend(t) + + _, err := backend.WriteCAS(ctx, res) + require.NoError(t, err) + + id := clone(res.Id) + id.Uid = "" + + check(t, func(t testingT) { + output, err := backend.Read(ctx, consistency, id) + require.NoError(t, err) + prototest.AssertDeepEqual(t, res, output, ignoreVersion) + }) + }) + + t.Run("different id", func(t *testing.T) { + backend := opts.NewBackend(t) + + _, err := backend.WriteCAS(ctx, res) + require.NoError(t, err) + + id := clone(res.Id) + id.Name = "different" + + check(t, func(t testingT) { + _, err := backend.Read(ctx, consistency, id) + require.ErrorIs(t, err, storage.ErrNotFound) + }) + }) + + t.Run("different uid", func(t *testing.T) { + backend := opts.NewBackend(t) + + _, err := backend.WriteCAS(ctx, res) + require.NoError(t, err) + + id := clone(res.Id) + id.Uid = "b" + + check(t, func(t testingT) { + _, err := backend.Read(ctx, consistency, id) + require.ErrorIs(t, err, storage.ErrNotFound) + }) + }) + + t.Run("different GroupVersion", func(t *testing.T) { + backend := opts.NewBackend(t) + + _, err := backend.WriteCAS(ctx, res) + require.NoError(t, err) + + id := clone(res.Id) + id.Type = typeAv2 + + check(t, func(t testingT) { + _, err := backend.Read(ctx, consistency, id) + require.Error(t, err) + + var e storage.GroupVersionMismatchError + if errors.As(err, &e) { + require.Equal(t, id.Type, e.RequestedType) + prototest.AssertDeepEqual(t, res, e.Stored, ignoreVersion) + } else { + t.Fatalf("expected storage.GroupVersionMismatchError, got: %T", err) + } + }) + }) + }) + } + +} + +func testCASWrite(t *testing.T, opts TestOptions) { + t.Run("version-based CAS", func(t *testing.T) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + v1 := &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeB, + Tenancy: tenancyDefault, + Name: "web", + Uid: "a", + }, + } + + v1.Version = "some-version" + _, err := backend.WriteCAS(ctx, v1) + require.ErrorIs(t, err, storage.ErrCASFailure) + + v1.Version = "" + v1, err = backend.WriteCAS(ctx, v1) + require.NoError(t, err) + require.NotEmpty(t, v1.Version) + + v2, err := backend.WriteCAS(ctx, v1) + require.NoError(t, err) + require.NotEmpty(t, v2.Version) + require.NotEqual(t, v1.Version, v2.Version) + + v3 := clone(v2) + v3.Version = "" + _, err = backend.WriteCAS(ctx, v3) + require.ErrorIs(t, err, storage.ErrCASFailure) + + v3.Version = v1.Version + _, err = backend.WriteCAS(ctx, v3) + require.ErrorIs(t, err, storage.ErrCASFailure) + }) + + t.Run("uid immutability", func(t *testing.T) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + v1, err := backend.WriteCAS(ctx, &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeB, + Tenancy: tenancyDefault, + Name: "web", + Uid: "a", + }, + }) + require.NoError(t, err) + + // Uid cannot change. + v2 := clone(v1) + v2.Id.Uid = "" + _, err = backend.WriteCAS(ctx, v2) + require.Error(t, err) + + v2.Id.Uid = "b" + _, err = backend.WriteCAS(ctx, v2) + require.ErrorIs(t, err, storage.ErrWrongUid) + + v2.Id.Uid = v1.Id.Uid + v2, err = backend.WriteCAS(ctx, v2) + require.NoError(t, err) + + // Uid can change after original resource is deleted. + require.NoError(t, backend.DeleteCAS(ctx, v2.Id, v2.Version)) + + v3 := clone(v2) + v3.Id.Uid = "b" + v3.Version = "" + + _, err = backend.WriteCAS(ctx, v3) + require.NoError(t, err) + }) +} + +func testCASDelete(t *testing.T, opts TestOptions) { + t.Run("version-based CAS", func(t *testing.T) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + res, err := backend.WriteCAS(ctx, &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeB, + Tenancy: tenancyDefault, + Name: "web", + Uid: "a", + }, + }) + require.NoError(t, err) + + require.ErrorIs(t, backend.DeleteCAS(ctx, res.Id, ""), storage.ErrCASFailure) + require.ErrorIs(t, backend.DeleteCAS(ctx, res.Id, "some-version"), storage.ErrCASFailure) + + require.NoError(t, backend.DeleteCAS(ctx, res.Id, res.Version)) + + eventually(t, func(t testingT) { + _, err = backend.Read(ctx, storage.EventualConsistency, res.Id) + require.ErrorIs(t, err, storage.ErrNotFound) + }) + }) + + t.Run("uid must match", func(t *testing.T) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + res, err := backend.WriteCAS(ctx, &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeB, + Tenancy: tenancyDefault, + Name: "web", + Uid: "a", + }, + }) + require.NoError(t, err) + + id := clone(res.Id) + id.Uid = "b" + require.NoError(t, backend.DeleteCAS(ctx, id, res.Version)) + + eventually(t, func(t testingT) { + _, err = backend.Read(ctx, storage.EventualConsistency, res.Id) + require.NoError(t, err) + }) + }) +} + +func testListWatch(t *testing.T, opts TestOptions) { + testCases := map[string]struct { + resourceType storage.UnversionedType + tenancy *pbresource.Tenancy + namePrefix string + results []*pbresource.Resource + }{ + "simple #1": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: tenancyDefault, + namePrefix: "", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + seedData[2], + }, + }, + "simple #2": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: tenancyOther, + namePrefix: "", + results: []*pbresource.Resource{ + seedData[3], + }, + }, + "fixed tenancy, name prefix": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: tenancyDefault, + namePrefix: "a", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + }, + }, + "wildcard tenancy": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: &pbresource.Tenancy{ + Partition: storage.Wildcard, + PeerName: storage.Wildcard, + Namespace: storage.Wildcard, + }, + namePrefix: "", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + seedData[2], + seedData[3], + seedData[5], + seedData[6], + }, + }, + "fixed partition, wildcard peer, wildcard namespace": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: &pbresource.Tenancy{ + Partition: "default", + PeerName: storage.Wildcard, + Namespace: storage.Wildcard, + }, + namePrefix: "", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + seedData[2], + seedData[5], + seedData[6], + }, + }, + "wildcard partition, fixed peer, wildcard namespace": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: &pbresource.Tenancy{ + Partition: storage.Wildcard, + PeerName: "local", + Namespace: storage.Wildcard, + }, + namePrefix: "", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + seedData[2], + seedData[3], + seedData[5], + }, + }, + "wildcard partition, wildcard peer, fixed namespace": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: &pbresource.Tenancy{ + Partition: storage.Wildcard, + PeerName: storage.Wildcard, + Namespace: "default", + }, + namePrefix: "", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + seedData[2], + seedData[6], + }, + }, + "fixed partition, fixed peer, wildcard namespace": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: &pbresource.Tenancy{ + Partition: "default", + PeerName: "local", + Namespace: storage.Wildcard, + }, + namePrefix: "", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + seedData[2], + seedData[5], + }, + }, + "wildcard tenancy, name prefix": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: &pbresource.Tenancy{ + Partition: storage.Wildcard, + PeerName: storage.Wildcard, + Namespace: storage.Wildcard, + }, + namePrefix: "a", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + seedData[3], + seedData[5], + seedData[6], + }, + }, + } + + t.Run("List", func(t *testing.T) { + ctx := testContext(t) + + consistencyModes := map[storage.ReadConsistency]consistencyChecker{ + storage.EventualConsistency: eventually, + } + if opts.SupportsStronglyConsistentList { + consistencyModes[storage.StrongConsistency] = immediately + } + + for consistency, check := range consistencyModes { + t.Run(consistency.String(), func(t *testing.T) { + for desc, tc := range testCases { + t.Run(desc, func(t *testing.T) { + backend := opts.NewBackend(t) + for _, r := range seedData { + _, err := backend.WriteCAS(ctx, r) + require.NoError(t, err) + } + + check(t, func(t testingT) { + res, err := backend.List(ctx, consistency, tc.resourceType, tc.tenancy, tc.namePrefix) + require.NoError(t, err) + prototest.AssertElementsMatch(t, res, tc.results, ignoreVersion) + }) + }) + } + }) + } + }) + + t.Run("WatchList", func(t *testing.T) { + for desc, tc := range testCases { + t.Run(fmt.Sprintf("%s - initial snapshot", desc), func(t *testing.T) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + // Write the seed data before the watch has been established. + for _, r := range seedData { + _, err := backend.WriteCAS(ctx, r) + require.NoError(t, err) + } + + watch, err := backend.WatchList(ctx, tc.resourceType, tc.tenancy, tc.namePrefix) + require.NoError(t, err) + + for i := 0; i < len(tc.results); i++ { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + t.Cleanup(cancel) + + event, err := watch.Next(ctx) + require.NoError(t, err) + + require.Equal(t, pbresource.WatchEvent_OPERATION_UPSERT, event.Operation) + prototest.AssertContainsElement(t, tc.results, event.Resource, ignoreVersion) + } + }) + + t.Run(fmt.Sprintf("%s - following events", desc), func(t *testing.T) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + watch, err := backend.WatchList(ctx, tc.resourceType, tc.tenancy, tc.namePrefix) + require.NoError(t, err) + + // Write the seed data after the watch has been established. + for _, r := range seedData { + _, err := backend.WriteCAS(ctx, r) + require.NoError(t, err) + } + + for i := 0; i < len(tc.results); i++ { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + t.Cleanup(cancel) + + event, err := watch.Next(ctx) + require.NoError(t, err) + + require.Equal(t, pbresource.WatchEvent_OPERATION_UPSERT, event.Operation) + prototest.AssertContainsElement(t, tc.results, event.Resource, ignoreVersion) + + // Check that Read implements "monotonic reads" with Watch. + readRes, err := backend.Read(ctx, storage.EventualConsistency, event.Resource.Id) + require.NoError(t, err) + prototest.AssertDeepEqual(t, event.Resource, readRes) + } + + // Delete a random resource to check we get an event. + del, err := backend.Read(ctx, storage.EventualConsistency, tc.results[rand.Intn(len(tc.results))].Id) + require.NoError(t, err) + require.NoError(t, backend.DeleteCAS(ctx, del.Id, del.Version)) + + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + t.Cleanup(cancel) + + event, err := watch.Next(ctx) + require.NoError(t, err) + + require.Equal(t, pbresource.WatchEvent_OPERATION_DELETE, event.Operation) + prototest.AssertDeepEqual(t, del, event.Resource) + + // Check that Read implements "monotonic reads" with Watch. + _, err = backend.Read(ctx, storage.EventualConsistency, del.Id) + require.ErrorIs(t, err, storage.ErrNotFound) + }) + } + }) +} + +func testOwnerReferences(t *testing.T, opts TestOptions) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + owner, err := backend.WriteCAS(ctx, &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeAv1, + Tenancy: tenancyDefault, + Name: "owner", + Uid: "a", + }, + }) + require.NoError(t, err) + + r1, err := backend.WriteCAS(ctx, &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeB, + Tenancy: tenancyDefault, + Name: "r1", + Uid: "a", + }, + Owner: owner.Id, + }) + require.NoError(t, err) + + r2, err := backend.WriteCAS(ctx, &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeAv2, + Tenancy: tenancyDefault, + Name: "r2", + Uid: "a", + }, + Owner: owner.Id, + }) + require.NoError(t, err) + + eventually(t, func(t testingT) { + refs, err := backend.OwnerReferences(ctx, owner.Id) + require.NoError(t, err) + prototest.AssertElementsMatch(t, refs, []*pbresource.ID{r1.Id, r2.Id}) + }) + + t.Run("references are anchored to a specific uid", func(t *testing.T) { + id := clone(owner.Id) + id.Uid = "different" + + eventually(t, func(t testingT) { + refs, err := backend.OwnerReferences(ctx, id) + require.NoError(t, err) + require.Empty(t, refs) + }) + }) + + t.Run("deleting the owner doesn't remove the references", func(t *testing.T) { + require.NoError(t, backend.DeleteCAS(ctx, owner.Id, owner.Version)) + + eventually(t, func(t testingT) { + refs, err := backend.OwnerReferences(ctx, owner.Id) + require.NoError(t, err) + prototest.AssertElementsMatch(t, refs, []*pbresource.ID{r1.Id, r2.Id}) + }) + }) + + t.Run("deleting the owned resource removes its reference", func(t *testing.T) { + require.NoError(t, backend.DeleteCAS(ctx, r2.Id, r2.Version)) + + eventually(t, func(t testingT) { + refs, err := backend.OwnerReferences(ctx, owner.Id) + require.NoError(t, err) + prototest.AssertElementsMatch(t, refs, []*pbresource.ID{r1.Id}) + }) + }) +} + +var ( + typeAv1 = &pbresource.Type{ + Group: "test", + GroupVersion: "v1", + Kind: "a", + } + typeAv2 = &pbresource.Type{ + Group: "test", + GroupVersion: "v2", + Kind: "a", + } + typeB = &pbresource.Type{ + Group: "test", + GroupVersion: "v1", + Kind: "b", + } + tenancyDefault = &pbresource.Tenancy{ + Partition: "default", + PeerName: "local", + Namespace: "default", + } + + tenancyDefaultOtherNamespace = &pbresource.Tenancy{ + Partition: "default", + PeerName: "local", + Namespace: "other", + } + tenancyDefaultOtherPeer = &pbresource.Tenancy{ + Partition: "default", + PeerName: "remote", + Namespace: "default", + } + tenancyOther = &pbresource.Tenancy{ + Partition: "billing", + PeerName: "local", + Namespace: "payments", + } + + seedData = []*pbresource.Resource{ + resource(typeAv1, tenancyDefault, "admin"), // 0 + resource(typeAv1, tenancyDefault, "api"), // 1 + resource(typeAv2, tenancyDefault, "web"), // 2 + resource(typeAv1, tenancyOther, "api"), // 3 + resource(typeB, tenancyDefault, "admin"), // 4 + resource(typeAv1, tenancyDefaultOtherNamespace, "autoscaler"), // 5 + resource(typeAv1, tenancyDefaultOtherPeer, "amplifier"), // 6 + } + + ignoreVersion = protocmp.IgnoreFields(&pbresource.Resource{}, "version") +) + +func resource(typ *pbresource.Type, ten *pbresource.Tenancy, name string) *pbresource.Resource { + return &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typ, + Tenancy: ten, + Name: name, + Uid: "a", + }, + } +} + +func testContext(t *testing.T) context.Context { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + return ctx +} + +func clone[T proto.Message](v T) T { return proto.Clone(v).(T) } + +type testingT interface { + require.TestingT + prototest.TestingT +} + +type consistencyChecker func(t *testing.T, fn func(testingT)) + +func eventually(t *testing.T, fn func(testingT)) { + t.Helper() + retry.Run(t, func(r *retry.R) { fn(r) }) +} + +func immediately(t *testing.T, fn func(testingT)) { + t.Helper() + fn(t) +} diff --git a/internal/storage/inmem/backend.go b/internal/storage/inmem/backend.go new file mode 100644 index 0000000000..d14358aa15 --- /dev/null +++ b/internal/storage/inmem/backend.go @@ -0,0 +1,73 @@ +package inmem + +import ( + "context" + "strconv" + "sync/atomic" + + "google.golang.org/protobuf/proto" + + "github.com/hashicorp/consul/internal/storage" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +// NewBackend returns a purely in-memory storage backend. It's suitable for +// testing and development mode, but should NOT be used in production as it +// has no support for durable persistence, so all of your data will be lost +// when the process restarts or crashes. +// +// You must call Run before using the backend. +func NewBackend() (*Backend, error) { + store, err := NewStore() + if err != nil { + return nil, err + } + return &Backend{store: store}, nil +} + +// Backend is a purely in-memory storage backend implementation. +type Backend struct { + vsn uint64 + + store *Store +} + +// Run until the given context is canceled. This method blocks, so should be +// called in a goroutine. +func (b *Backend) Run(ctx context.Context) { b.store.Run(ctx) } + +// Read implements the storage.Backend interface. +func (b *Backend) Read(_ context.Context, _ storage.ReadConsistency, id *pbresource.ID) (*pbresource.Resource, error) { + return b.store.Read(id) +} + +// WriteCAS implements the storage.Backend interface. +func (b *Backend) WriteCAS(_ context.Context, res *pbresource.Resource) (*pbresource.Resource, error) { + stored := proto.Clone(res).(*pbresource.Resource) + stored.Version = strconv.Itoa(int(atomic.AddUint64(&b.vsn, 1))) + + if err := b.store.WriteCAS(stored, res.Version); err != nil { + return nil, err + } + return stored, nil +} + +// DeleteCAS implements the storage.Backend interface. +func (b *Backend) DeleteCAS(_ context.Context, id *pbresource.ID, version string) error { + return b.store.DeleteCAS(id, version) +} + +// List implements the storage.Backend interface. +func (b *Backend) List(_ context.Context, _ storage.ReadConsistency, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) { + return b.store.List(resType, tenancy, namePrefix) +} + +// WatchList implements the storage.Backend interface. +func (b *Backend) WatchList(_ context.Context, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) (storage.Watch, error) { + return b.store.WatchList(resType, tenancy, namePrefix) +} + +// OwnerReferences implements the storage.Backend interface. +func (b *Backend) OwnerReferences(_ context.Context, id *pbresource.ID) ([]*pbresource.ID, error) { + return b.store.OwnerReferences(id) +} diff --git a/internal/storage/inmem/backend_test.go b/internal/storage/inmem/backend_test.go new file mode 100644 index 0000000000..fcfeb896c9 --- /dev/null +++ b/internal/storage/inmem/backend_test.go @@ -0,0 +1,28 @@ +package inmem_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/internal/storage" + "github.com/hashicorp/consul/internal/storage/conformance" + "github.com/hashicorp/consul/internal/storage/inmem" +) + +func TestBackend_Conformance(t *testing.T) { + conformance.Test(t, conformance.TestOptions{ + NewBackend: func(t *testing.T) storage.Backend { + backend, err := inmem.NewBackend() + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + go backend.Run(ctx) + + return backend + }, + SupportsStronglyConsistentList: true, + }) +} diff --git a/internal/storage/inmem/event_index.go b/internal/storage/inmem/event_index.go new file mode 100644 index 0000000000..58aa05e1d2 --- /dev/null +++ b/internal/storage/inmem/event_index.go @@ -0,0 +1,33 @@ +package inmem + +import "github.com/hashicorp/go-memdb" + +type meta struct { + Key string + Value any +} + +func incrementEventIndex(tx *memdb.Txn) (uint64, error) { + idx, err := currentEventIndex(tx) + if err != nil { + return 0, err + } + + idx++ + if err := tx.Insert(tableNameMetadata, meta{Key: metaKeyEventIndex, Value: idx}); err != nil { + return 0, nil + } + return idx, nil +} + +func currentEventIndex(tx *memdb.Txn) (uint64, error) { + v, err := tx.First(tableNameMetadata, indexNameID, metaKeyEventIndex) + if err != nil { + return 0, err + } + if v == nil { + // 0 and 1 index are reserved for special use in the stream package. + return 2, nil + } + return v.(meta).Value.(uint64), nil +} diff --git a/internal/storage/inmem/schema.go b/internal/storage/inmem/schema.go new file mode 100644 index 0000000000..2f63fb8ce6 --- /dev/null +++ b/internal/storage/inmem/schema.go @@ -0,0 +1,247 @@ +package inmem + +import ( + "bytes" + "fmt" + "strings" + + "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/internal/storage" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +const ( + tableNameMetadata = "metadata" + tableNameResources = "resources" + + indexNameID = "id" + indexNameOwner = "owner" + + metaKeyEventIndex = "index" +) + +func newDB() (*memdb.MemDB, error) { + return memdb.NewMemDB(&memdb.DBSchema{ + Tables: map[string]*memdb.TableSchema{ + tableNameMetadata: { + Name: tableNameMetadata, + Indexes: map[string]*memdb.IndexSchema{ + indexNameID: { + Name: indexNameID, + AllowMissing: false, + Unique: true, + Indexer: &memdb.StringFieldIndex{Field: "Key"}, + }, + }, + }, + tableNameResources: { + Name: tableNameResources, + Indexes: map[string]*memdb.IndexSchema{ + indexNameID: { + Name: indexNameID, + AllowMissing: false, + Unique: true, + Indexer: idIndexer{}, + }, + indexNameOwner: { + Name: indexNameOwner, + AllowMissing: true, + Unique: false, + Indexer: ownerIndexer{}, + }, + }, + }, + }, + }) +} + +// indexSeparator delimits the segments of our radix tree keys. +const indexSeparator = "\x00" + +// idIndexer implements the memdb.Indexer, memdb.SingleIndexer and +// memdb.PrefixIndexer interfaces. It is used for indexing resources +// by their IDs. +type idIndexer struct{} + +// FromArgs constructs a radix tree key from an ID for lookup. +func (i idIndexer) FromArgs(args ...any) ([]byte, error) { + if l := len(args); l != 1 { + return nil, fmt.Errorf("expected 1 arg, got: %d", l) + } + id, ok := args[0].(*pbresource.ID) + if !ok { + return nil, fmt.Errorf("expected *pbresource.ID, got: %T", args[0]) + } + return indexFromID(id, false), nil +} + +// FromObject constructs a radix tree key from a Resource at write-time, or an +// ID at delete-time. +func (i idIndexer) FromObject(raw any) (bool, []byte, error) { + switch t := raw.(type) { + case *pbresource.ID: + return true, indexFromID(t, false), nil + case *pbresource.Resource: + return true, indexFromID(t.Id, false), nil + } + return false, nil, fmt.Errorf("expected *pbresource.Resource or *pbresource.ID, got: %T", raw) +} + +// PrefixFromArgs constructs a radix tree key prefix from a query for listing. +func (i idIndexer) PrefixFromArgs(args ...any) ([]byte, error) { + if l := len(args); l != 1 { + return nil, fmt.Errorf("expected 1 arg, got: %d", l) + } + + q, ok := args[0].(query) + if !ok { + return nil, fmt.Errorf("expected query, got: %T", args[0]) + } + return q.indexPrefix(), nil +} + +// ownerIndexer implements the memdb.Indexer and memdb.SingleIndexer interfaces. +// It is used for indexing resources by their owners. +type ownerIndexer struct{} + +// FromArgs constructs a radix tree key from an ID for lookup. +func (i ownerIndexer) FromArgs(args ...any) ([]byte, error) { + if l := len(args); l != 1 { + return nil, fmt.Errorf("expected 1 arg, got: %d", l) + } + id, ok := args[0].(*pbresource.ID) + if !ok { + return nil, fmt.Errorf("expected *pbresource.ID, got: %T", args[0]) + } + return indexFromID(id, true), nil +} + +// FromObject constructs a radix key tree from a Resource at write-time. +func (i ownerIndexer) FromObject(raw any) (bool, []byte, error) { + res, ok := raw.(*pbresource.Resource) + if !ok { + return false, nil, fmt.Errorf("expected *pbresource.Resource, got: %T", raw) + } + if res.Owner == nil { + return false, nil, nil + } + return true, indexFromID(res.Owner, true), nil +} + +func indexFromType(t storage.UnversionedType) []byte { + var b indexBuilder + b.String(t.Group) + b.String(t.Kind) + return b.Bytes() +} + +func indexFromTenancy(t *pbresource.Tenancy) []byte { + var b indexBuilder + b.String(t.Partition) + b.String(t.PeerName) + b.String(t.Namespace) + return b.Bytes() +} + +func indexFromID(id *pbresource.ID, includeUid bool) []byte { + var b indexBuilder + b.Raw(indexFromType(storage.UnversionedTypeFrom(id.Type))) + b.Raw(indexFromTenancy(id.Tenancy)) + b.String(id.Name) + if includeUid { + b.String(id.Uid) + } + return b.Bytes() +} + +type indexBuilder bytes.Buffer + +func (i *indexBuilder) Raw(v []byte) { + (*bytes.Buffer)(i).Write(v) +} + +func (i *indexBuilder) String(s string) { + (*bytes.Buffer)(i).WriteString(s) + (*bytes.Buffer)(i).WriteString(indexSeparator) +} + +func (i *indexBuilder) Bytes() []byte { + return (*bytes.Buffer)(i).Bytes() +} + +type query struct { + resourceType storage.UnversionedType + tenancy *pbresource.Tenancy + namePrefix string +} + +// indexPrefix is called by idIndexer.PrefixFromArgs to construct a radix tree +// key prefix for list queries. +// +// Our radix tree keys are structured like so: +// +// +// +// Where each segment is followed by a NULL terminator. +// +// In order to handle wildcard queries, we return a prefix up to the wildcarded +// field. For example: +// +// Query: type={mesh,v1,service}, partition=default, peer=*, namespace=default +// Prefix: mesh[NULL]v1[NULL]service[NULL]default[NULL] +// +// Which means that we must manually apply filters after the wildcard (i.e. +// namespace in the above example) in the matches method. +func (q query) indexPrefix() []byte { + var b indexBuilder + b.Raw(indexFromType(q.resourceType)) + + if v := q.tenancy.Partition; v == storage.Wildcard { + return b.Bytes() + } else { + b.String(v) + } + + if v := q.tenancy.PeerName; v == storage.Wildcard { + return b.Bytes() + } else { + b.String(v) + } + + if v := q.tenancy.Namespace; v == storage.Wildcard { + return b.Bytes() + } else { + b.String(v) + } + + if q.namePrefix != "" { + b.Raw([]byte(q.namePrefix)) + } + + return b.Bytes() +} + +// matches applies filters that couldn't be applied by just doing a radix tree +// prefix scan, because an earlier segment of the key prefix was wildcarded. +// +// See docs on query.indexPrefix for an example. +func (q query) matches(res *pbresource.Resource) bool { + if q.tenancy.Partition != storage.Wildcard && res.Id.Tenancy.Partition != q.tenancy.Partition { + return false + } + + if q.tenancy.PeerName != storage.Wildcard && res.Id.Tenancy.PeerName != q.tenancy.PeerName { + return false + } + + if q.tenancy.Namespace != storage.Wildcard && res.Id.Tenancy.Namespace != q.tenancy.Namespace { + return false + } + + if len(q.namePrefix) != 0 && !strings.HasPrefix(res.Id.Name, q.namePrefix) { + return false + } + + return true +} diff --git a/internal/storage/inmem/store.go b/internal/storage/inmem/store.go new file mode 100644 index 0000000000..fc769a6e07 --- /dev/null +++ b/internal/storage/inmem/store.go @@ -0,0 +1,274 @@ +package inmem + +import ( + "context" + "sync" + "time" + + "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/internal/storage" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +// Store implements an in-memory resource database using go-memdb. +// +// It can be used as a storage backend directly via the Backend type in this +// package, but also handles reads in our Raft backend, and can be used as a +// local cache when storing data in external systems (e.g. RDBMS, K/V stores). +type Store struct { + db *memdb.MemDB + pub *stream.EventPublisher + + // eventLock is used to serialize operations that result in the publishing of + // events (i.e. writes and deletes) to ensure correct ordering when there are + // concurrent writers. + // + // We cannot rely on MemDB's write lock for this, because events must be + // published *after* the transaction is committed to provide monotonic reads + // between Watch and Read calls. In other words, if we were to publish an event + // before the transaction was committed, there would be a small window of time + // where a watcher (e.g. controller) could try to Read the resource and not get + // the version they were notified about. + // + // Without this lock, it would be possible to publish events out-of-order. + eventLock sync.Mutex +} + +// NewStore creates a Store. +// +// You must call Run before using the store. +func NewStore() (*Store, error) { + db, err := newDB() + if err != nil { + return nil, err + } + + s := &Store{ + db: db, + pub: stream.NewEventPublisher(10 * time.Second), + } + s.pub.RegisterHandler(eventTopic, s.watchSnapshot, false) + + return s, nil +} + +// Run until the given context is canceled. This method blocks, so should be +// called in a goroutine. +func (s *Store) Run(ctx context.Context) { s.pub.Run(ctx) } + +// Read a resource using its ID. +// +// For more information, see the storage.Backend documentation. +func (s *Store) Read(id *pbresource.ID) (*pbresource.Resource, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + val, err := tx.First(tableNameResources, indexNameID, id) + if err != nil { + return nil, err + } + if val == nil { + return nil, storage.ErrNotFound + } + + res := val.(*pbresource.Resource) + + // Observe the Uid if it was given. + if id.Uid != "" && res.Id.Uid != id.Uid { + return nil, storage.ErrNotFound + } + + // Let the caller know they need to upgrade/downgrade the schema version. + if id.Type.GroupVersion != res.Id.Type.GroupVersion { + return nil, storage.GroupVersionMismatchError{ + RequestedType: id.Type, + Stored: res, + } + } + + return res, nil +} + +// WriteCAS performs an atomic Compare-And-Swap (CAS) write of a resource. +// +// For more information, see the storage.Backend documentation. +func (s *Store) WriteCAS(res *pbresource.Resource, vsn string) error { + s.eventLock.Lock() + defer s.eventLock.Unlock() + + tx := s.db.Txn(true) + defer tx.Abort() + + existing, err := tx.First(tableNameResources, indexNameID, res.Id) + if err != nil { + return err + } + + // Callers provide an empty version string on initial resource creation. + if existing == nil && vsn != "" { + return storage.ErrCASFailure + } + + if existing != nil { + existingRes := existing.(*pbresource.Resource) + + // Ensure CAS semantics. + if existingRes.Version != vsn { + return storage.ErrCASFailure + } + + // Uid is immutable. + if existingRes.Id.Uid != res.Id.Uid { + return storage.ErrWrongUid + } + } + + if err := tx.Insert(tableNameResources, res); err != nil { + return err + } + + idx, err := incrementEventIndex(tx) + if err != nil { + return nil + } + tx.Commit() + + s.publishEvent(idx, pbresource.WatchEvent_OPERATION_UPSERT, res) + + return nil +} + +// DeleteCAS performs an atomic Compare-And-Swap (CAS) deletion of a resource. +// +// For more information, see the storage.Backend documentation. +func (s *Store) DeleteCAS(id *pbresource.ID, vsn string) error { + s.eventLock.Lock() + defer s.eventLock.Unlock() + + tx := s.db.Txn(true) + defer tx.Abort() + + existing, err := tx.First(tableNameResources, indexNameID, id) + if err != nil { + return err + } + + // Deleting an already deleted resource is a no-op. + if existing == nil { + return nil + } + + res := existing.(*pbresource.Resource) + + // Deleting a resource using a previous Uid is a no-op. + if id.Uid != res.Id.Uid { + return nil + } + + // Ensure CAS semantics. + if vsn != res.Version { + return storage.ErrCASFailure + } + + if err := tx.Delete(tableNameResources, id); err != nil { + return err + } + + idx, err := incrementEventIndex(tx) + if err != nil { + return nil + } + tx.Commit() + + s.publishEvent(idx, pbresource.WatchEvent_OPERATION_DELETE, res) + + return nil +} + +// List resources of the given type, tenancy, and optionally matching the given +// name prefix. +// +// For more information, see the storage.Backend documentation. +func (s *Store) List(typ storage.UnversionedType, ten *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + return listTxn(tx, query{typ, ten, namePrefix}) +} + +func listTxn(tx *memdb.Txn, q query) ([]*pbresource.Resource, error) { + iter, err := tx.Get(tableNameResources, indexNameID+"_prefix", q) + if err != nil { + return nil, err + } + + list := make([]*pbresource.Resource, 0) + for v := iter.Next(); v != nil; v = iter.Next() { + res := v.(*pbresource.Resource) + + if q.matches(res) { + list = append(list, res) + } + } + return list, nil +} + +// WatchList watches resources of the given type, tenancy, and optionally +// matching the given name prefix. +// +// For more information, see the storage.Backend documentation. +func (s *Store) WatchList(typ storage.UnversionedType, ten *pbresource.Tenancy, namePrefix string) (*Watch, error) { + // If the user specifies a wildcard, we subscribe to events for resources in + // all partitions, peers, and namespaces, and manually filter out irrelevant + // stuff (in Watch.Next). + // + // If the user gave exact tenancy values, we can subscribe to events for the + // relevant resources only, which is far more efficient. + var sub stream.Subject + if ten.Partition == storage.Wildcard || + ten.PeerName == storage.Wildcard || + ten.Namespace == storage.Wildcard { + sub = wildcardSubject{typ} + } else { + sub = tenancySubject{typ, ten} + } + + ss, err := s.pub.Subscribe(&stream.SubscribeRequest{ + Topic: eventTopic, + Subject: sub, + }) + if err != nil { + return nil, err + } + + return &Watch{ + sub: ss, + query: query{ + resourceType: typ, + tenancy: ten, + namePrefix: namePrefix, + }, + }, nil +} + +// OwnerReferences returns the IDs of resources owned by the resource with the +// given ID. +// +// For more information, see the storage.Backend documentation. +func (s *Store) OwnerReferences(id *pbresource.ID) ([]*pbresource.ID, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + iter, err := tx.Get(tableNameResources, indexNameOwner, id) + if err != nil { + return nil, err + } + + var refs []*pbresource.ID + for v := iter.Next(); v != nil; v = iter.Next() { + refs = append(refs, v.(*pbresource.Resource).Id) + } + return refs, nil +} diff --git a/internal/storage/inmem/watch.go b/internal/storage/inmem/watch.go new file mode 100644 index 0000000000..5356ea7a34 --- /dev/null +++ b/internal/storage/inmem/watch.go @@ -0,0 +1,189 @@ +package inmem + +import ( + "context" + "fmt" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/internal/storage" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/proto/private/pbsubscribe" +) + +// Watch implements the storage.Watch interface using a stream.Subscription. +type Watch struct { + sub *stream.Subscription + query query + + // events holds excess events when they are bundled in a stream.PayloadEvents, + // until Next is called again. + events []stream.Event +} + +// Next returns the next WatchEvent, blocking until one is available. +func (w *Watch) Next(ctx context.Context) (*pbresource.WatchEvent, error) { + for { + e, err := w.nextEvent(ctx) + if err != nil { + return nil, err + } + + event := e.Payload.(eventPayload).event + if w.query.matches(event.Resource) { + return event, nil + } + } +} + +func (w *Watch) nextEvent(ctx context.Context) (*stream.Event, error) { + if len(w.events) != 0 { + event := w.events[0] + w.events = w.events[1:] + return &event, nil + } + + for { + e, err := w.sub.Next(ctx) + if err != nil { + return nil, err + } + + if e.IsFramingEvent() { + continue + } + + switch t := e.Payload.(type) { + case eventPayload: + return &e, nil + case *stream.PayloadEvents: + if len(t.Items) == 0 { + continue + } + + event, rest := t.Items[0], t.Items[1:] + w.events = rest + return &event, nil + } + } +} + +var eventTopic = stream.StringTopic("resources") + +type eventPayload struct { + subject stream.Subject + event *pbresource.WatchEvent +} + +func (p eventPayload) Subject() stream.Subject { return p.subject } + +// These methods are required by the stream.Payload interface, but we don't use them. +func (eventPayload) HasReadPermission(acl.Authorizer) bool { return false } +func (eventPayload) ToSubscriptionEvent(uint64) *pbsubscribe.Event { return nil } + +type wildcardSubject struct { + resourceType storage.UnversionedType +} + +func (s wildcardSubject) String() string { + return s.resourceType.Group + indexSeparator + + s.resourceType.Kind + indexSeparator + + storage.Wildcard +} + +type tenancySubject struct { + resourceType storage.UnversionedType + tenancy *pbresource.Tenancy +} + +func (s tenancySubject) String() string { + return s.resourceType.Group + indexSeparator + + s.resourceType.Kind + indexSeparator + + s.tenancy.Partition + indexSeparator + + s.tenancy.PeerName + indexSeparator + + s.tenancy.Namespace +} + +// publishEvent sends the event to the relevant Watches. +func (s *Store) publishEvent(idx uint64, op pbresource.WatchEvent_Operation, res *pbresource.Resource) { + id := res.Id + resourceType := storage.UnversionedTypeFrom(id.Type) + event := &pbresource.WatchEvent{Operation: op, Resource: res} + + // We publish two copies of the event: one to the tenancy-specific subject and + // another to a wildcard subject. Ideally, we'd be able to put the type in the + // topic instead and use stream.SubjectWildcard, but this requires knowing all + // types up-front (to register the snapshot handlers). + s.pub.Publish([]stream.Event{ + { + Topic: eventTopic, + Index: idx, + Payload: eventPayload{ + subject: wildcardSubject{resourceType}, + event: event, + }, + }, + { + Topic: eventTopic, + Index: idx, + Payload: eventPayload{ + subject: tenancySubject{ + resourceType: resourceType, + tenancy: id.Tenancy, + }, + event: event, + }, + }, + }) +} + +// watchSnapshot implements a stream.SnapshotFunc to provide upsert events for +// the initial state of the world. +func (s *Store) watchSnapshot(req stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) { + var q query + switch t := req.Subject.(type) { + case tenancySubject: + q.resourceType = t.resourceType + q.tenancy = t.tenancy + case wildcardSubject: + q.resourceType = t.resourceType + q.tenancy = &pbresource.Tenancy{ + Partition: storage.Wildcard, + PeerName: storage.Wildcard, + Namespace: storage.Wildcard, + } + default: + return 0, fmt.Errorf("unhandled subject type: %T", req.Subject) + } + + tx := s.db.Txn(false) + defer tx.Abort() + + idx, err := currentEventIndex(tx) + if err != nil { + return 0, err + } + + results, err := listTxn(tx, q) + if err != nil { + return 0, nil + } + + events := make([]stream.Event, len(results)) + for i, r := range results { + events[i] = stream.Event{ + Topic: eventTopic, + Index: idx, + Payload: eventPayload{ + subject: req.Subject, + event: &pbresource.WatchEvent{ + Operation: pbresource.WatchEvent_OPERATION_UPSERT, + Resource: r, + }, + }, + } + } + snap.Append(events) + + return idx, nil +} diff --git a/internal/storage/storage.go b/internal/storage/storage.go new file mode 100644 index 0000000000..3ae33d6d42 --- /dev/null +++ b/internal/storage/storage.go @@ -0,0 +1,307 @@ +package storage + +import ( + "context" + "errors" + "fmt" + + "github.com/hashicorp/consul/proto-public/pbresource" +) + +// Wildcard can be given as Tenancy fields in List and Watch calls, to enumerate +// resources across multiple partitions, peers, namespaces, etc. +const Wildcard = "*" + +var ( + // ErrNotFound indicates that the resource could not be found. + ErrNotFound = errors.New("resource not found") + + // ErrCASFailure indicates that the attempted write failed because the given + // version does not match what is currently stored. + ErrCASFailure = errors.New("CAS operation failed because the given version doesn't match what is stored") + + // ErrWrongUid indicates that the attempted write failed because the resource's + // Uid doesn't match what is currently stored (e.g. the caller is trying to + // operate on a deleted resource with the same name). + ErrWrongUid = errors.New("write failed because the given uid doesn't match what is stored") + + // ErrInconsistent indicates that the attempted write or consistent read could + // not be achieved because of a consistency or availability issue (e.g. loss of + // quorum, or when interacting with a Raft follower). + ErrInconsistent = errors.New("cannot satisfy consistency requirements") +) + +// ReadConsistency is used to specify the required consistency guarantees for +// a read operation. +type ReadConsistency int + +const ( + // EventualConsistency provides a weak set of guarantees, but is much cheaper + // than using StrongConsistency and therefore should be treated as the default. + // + // It guarantees [monotonic reads]. That is, a read will always return results + // that are as up-to-date as an earlier read, provided both happen on the same + // Consul server. But does not make any such guarantee about writes. + // + // In other words, reads won't necessarily reflect earlier writes, even when + // made against the same server. + // + // Operations that don't allow the caller to specify the consistency mode will + // hold the same guarantees as EventualConsistency, but check the method docs + // for caveats. + // + // [monotonic reads]: https://jepsen.io/consistency/models/monotonic-reads + EventualConsistency ReadConsistency = iota + + // StrongConsistency provides a very strong set of guarantees but is much more + // expensive, so should be used sparingly. + // + // It guarantees full [linearizability], such that a read will always return + // the most up-to-date version of a resource, without caveat. + // + // [linearizability]: https://jepsen.io/consistency/models/linearizable + StrongConsistency +) + +// String implements the fmt.Stringer interface. +func (c ReadConsistency) String() string { + switch c { + case EventualConsistency: + return "Eventual Consistency" + case StrongConsistency: + return "Strong Consistency" + } + panic(fmt.Sprintf("unknown ReadConsistency (%d)", c)) +} + +// Backend provides the low-level storage substrate for resources. It can be +// implemented using internal (i.e. Raft+MemDB) or external (e.g. DynamoDB) +// storage systems. +// +// Refer to the method comments for details of the behaviors and invariants +// provided, which are also verified by the conformance test suite in the +// internal/storage/conformance package. +// +// Cross-cutting concerns: +// +// # UIDs +// +// Users identify resources with a name of their choice (e.g. service "billing") +// but internally, we add our own identifier in the Uid field to disambiguate +// references when resources are deleted and re-created with the same name. +// +// # GroupVersion +// +// In order to support automatic translation between schema versions, we only +// store a single version of a resource, and treat types with the same Group +// and Kind, but different GroupVersions, as equivalent. +// +// # Read-Modify-Write Patterns +// +// All writes at the storage backend level are CAS (Compare-And-Swap) operations +// where the caller must provide the resource in its entirety, with the current +// version string. +// +// Non-CAS writes should be implemented at a higher level (i.e. in the Resource +// Service) by reading the resource, applying the user's requested modifications, +// and writing it back. This allows us to ensure we're correctly carrying over +// the resource's Status and Uid, without requiring support for partial update +// or "patch" operations from external storage systems. +// +// In cases where there are concurrent interleaving writes made to a resource, +// it's likely that a CAS operation will fail, so callers may need to put their +// Read-Modify-Write cycle in a retry loop. +type Backend interface { + // Read a resource using its ID. + // + // # UIDs + // + // If id.Uid is empty, Read will ignore it and return whatever resource is + // stored with the given name. This is the desired behavior for user-initiated + // reads. + // + // If id.Uid is non-empty, Read will only return a resource if its Uid matches, + // otherwise it'll return ErrNotFound. This is the desired behaviour for reads + // initiated by controllers, which tend to operate on a specific lifetime of a + // resource. + // + // See Backend docs for more details. + // + // # GroupVersion + // + // If id.Type.GroupVersion doesn't match what is stored, Read will return a + // GroupVersionMismatchError which contains a pointer to the stored resource. + // + // See Backend docs for more details. + // + // # Consistency + // + // Read supports both EventualConsistency and StrongConsistency. + Read(ctx context.Context, consistency ReadConsistency, id *pbresource.ID) (*pbresource.Resource, error) + + // WriteCAS performs an atomic CAS (Compare-And-Swap) write of a resource based + // on its version. The given version will be compared to what is stored, and if + // it does not match, ErrCASFailure will be returned. To create new resources, + // set version to an empty string. + // + // If a write cannot be performed because of a consistency or availability + // issue (e.g. when interacting with a Raft follower, or when quorum is lost) + // ErrInconsistent will be returned. + // + // # UIDs + // + // UIDs are immutable, so if the given resource's Uid field doesn't match what + // is stored, ErrWrongUid will be returned. + // + // See Backend docs for more details. + // + // # GroupVersion + // + // Write does not validate the GroupVersion and allows you to overwrite a + // resource stored in an older form with a newer, and vice versa. + // + // See Backend docs for more details. + WriteCAS(ctx context.Context, res *pbresource.Resource) (*pbresource.Resource, error) + + // DeleteCAS performs an atomic CAS (Compare-And-Swap) deletion of a resource + // based on its version. The given version will be compared to what is stored, + // and if it does not match, ErrCASFailure will be returned. + // + // If the resource does not exist (i.e. has already been deleted) no error will + // be returned. + // + // If a deletion cannot be performed because of a consistency or availability + // issue (e.g. when interacting with a Raft follower, or when quorum is lost) + // ErrInconsistent will be returned. + // + // # UIDs + // + // If the given id's Uid does not match what is stored, the deletion will be a + // no-op (i.e. it is considered to be a different resource). + // + // See Backend docs for more details. + // + // # GroupVersion + // + // Delete does not check or refer to the GroupVersion. Resources of the same + // Group and Kind are considered equivalent, so requests to delete a resource + // using a new GroupVersion will delete a resource even if it's stored with an + // old GroupVersion. + // + // See Backend docs for more details. + DeleteCAS(ctx context.Context, id *pbresource.ID, version string) error + + // List resources of the given type, tenancy, and optionally matching the given + // name prefix. + // + // # Tenancy Wildcard + // + // In order to list resources across multiple tenancy units (e.g. namespaces) + // pass the Wildcard sentinel value in tenancy fields. + // + // # GroupVersion + // + // The resType argument contains only the Group and Kind, to reflect the fact + // that resources may be stored in a mix of old and new forms. As such, it's + // the caller's responsibility to check the resource's GroupVersion and + // translate or filter accordingly. + // + // # Consistency + // + // Generally, List only supports EventualConsistency. However, for backward + // compatability with our v1 APIs, the Raft backend supports StrongConsistency + // for list operations. + // + // When the v1 APIs finally goes away, so will this consistency parameter, so + // it should not be depended on outside of the backward compatability layer. + List(ctx context.Context, consistency ReadConsistency, resType UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) + + // WatchList watches resources of the given type, tenancy, and optionally + // matching the given name prefix. Upsert events for the current state of the + // world (i.e. existing resources that match the given filters) will be emitted + // immediately, and will be followed by delta events whenever resources are + // written or deleted. + // + // # Consistency + // + // WatchList makes no guarantees about event timeliness (e.g. an event for a + // write may not be received immediately), but it does guarantee that events + // will be emitted in the correct order. + // + // There's also a guarantee of [monotonic reads] between Read and WatchList, + // such that Read will never return data that is older than the most recent + // event you received. Note: this guarantee holds at the (in-process) storage + // backend level, only. Controllers and other users of the Resource Service API + // must remain connected to the same Consul server process to avoid receiving + // events about writes that they then cannot read. In other words, it is *not* + // linearizable. + // + // There's a similar guarantee between WatchList and OwnerReferences, see the + // OwnerReferences docs for more information. + // + // See List docs for details about Tenancy Wildcard and GroupVersion. + // + // [monotonic reads]: https://jepsen.io/consistency/models/monotonic-reads + WatchList(ctx context.Context, resType UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) (Watch, error) + + // OwnerReferences returns the IDs of resources owned by the resource with the + // given ID. It is typically used to implement cascading deletion. + // + // # Consistency + // + // OwnerReferences may return stale results, but guarnantees [monotonic reads] + // with events received from WatchList. In practice, this means that if you + // learn that a resource has been deleted through a watch event, the results + // you receive from OwnerReferences will contain all references that existed + // at the time the owner was deleted. It doesn't make any guarantees about + // references that are created *after* the owner was deleted, though, so you + // must either prevent that from happening (e.g. by performing a consistent + // read of the owner in the write-path, which has its own ordering/correctness + // challenges), or by calling OwnerReferences after the expected window of + // inconsistency (e.g. deferring cascading deletion, or doing a second pass + // an hour later). + // + // [montonic reads]: https://jepsen.io/consistency/models/monotonic-reads + OwnerReferences(ctx context.Context, id *pbresource.ID) ([]*pbresource.ID, error) +} + +// Watch represents a watch on a given set of resources. Call Next to get the +// next event (i.e. upsert or deletion). +type Watch interface { + // Next returns the next event (i.e. upsert or deletion) + Next(ctx context.Context) (*pbresource.WatchEvent, error) +} + +// UnversionedType represents a pbresource.Type as it is stored without the +// GroupVersion. +type UnversionedType struct { + Group string + Kind string +} + +// UnversionedTypeFrom creates an UnversionedType from the given *pbresource.Type. +func UnversionedTypeFrom(t *pbresource.Type) UnversionedType { + return UnversionedType{ + Group: t.Group, + Kind: t.Kind, + } +} + +// GroupVersionMismatchError is returned when a resource is stored as a type +// with a different GroupVersion than was requested. +type GroupVersionMismatchError struct { + // RequestedType is the type that was requested. + RequestedType *pbresource.Type + + // Stored is the resource as it is stored. + Stored *pbresource.Resource +} + +// Error implements the error interface. +func (e GroupVersionMismatchError) Error() string { + return fmt.Sprintf( + "resource was requested with GroupVersion=%q, but stored with GroupVersion=%q", + e.RequestedType.GroupVersion, + e.Stored.Id.Type.GroupVersion, + ) +} diff --git a/proto/private/prototest/testing.go b/proto/private/prototest/testing.go index bf25fb0a10..1baafa2c61 100644 --- a/proto/private/prototest/testing.go +++ b/proto/private/prototest/testing.go @@ -1,13 +1,16 @@ package prototest import ( - "testing" - "github.com/google/go-cmp/cmp" "google.golang.org/protobuf/testing/protocmp" ) -func AssertDeepEqual(t testing.TB, x, y interface{}, opts ...cmp.Option) { +type TestingT interface { + Helper() + Fatalf(string, ...any) +} + +func AssertDeepEqual(t TestingT, x, y interface{}, opts ...cmp.Option) { t.Helper() opts = append(opts, protocmp.Transform()) @@ -24,7 +27,7 @@ func AssertDeepEqual(t testing.TB, x, y interface{}, opts ...cmp.Option) { // // prototest.AssertElementsMatch(t, [1, 3, 2, 3], [1, 3, 3, 2]) func AssertElementsMatch[V any]( - t testing.TB, listX, listY []V, opts ...cmp.Option, + t TestingT, listX, listY []V, opts ...cmp.Option, ) { t.Helper() @@ -73,3 +76,17 @@ func AssertElementsMatch[V any]( t.Fatalf("assertion failed: slices do not have matching elements\n--- expected\n+++ actual\n%v", diff) } } + +func AssertContainsElement[V any](t TestingT, list []V, element V, opts ...cmp.Option) { + t.Helper() + + opts = append(opts, protocmp.Transform()) + + for _, e := range list { + if cmp.Equal(e, element, opts...) { + return + } + } + + t.Fatalf("assertion failed: list does not contain element\n--- list\n%#v\n--- element: %#v", list, element) +}