diff --git a/internal/storage/conformance/conformance.go b/internal/storage/conformance/conformance.go new file mode 100644 index 0000000000..8c0e114bd0 --- /dev/null +++ b/internal/storage/conformance/conformance.go @@ -0,0 +1,675 @@ +package conformance + +import ( + "context" + "errors" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/testing/protocmp" + + "github.com/hashicorp/consul/internal/storage" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/proto/private/prototest" + "github.com/hashicorp/consul/sdk/testutil/retry" +) + +type TestOptions struct { + // NewBackend will be called to construct a storage.Backend to run the tests + // against. + NewBackend func(t *testing.T) storage.Backend + + // SupportsStronglyConsistentList indicates whether the given storage backend + // supports strongly consistent list operations. + SupportsStronglyConsistentList bool +} + +// Test runs a suite of tests against a storage.Backend implementation to check +// it correctly implements our required behaviours. +func Test(t *testing.T, opts TestOptions) { + require.NotNil(t, opts.NewBackend, "NewBackend method is required") + + t.Run("Read", func(t *testing.T) { testRead(t, opts) }) + t.Run("CAS Write", func(t *testing.T) { testCASWrite(t, opts) }) + t.Run("CAS Delete", func(t *testing.T) { testCASDelete(t, opts) }) + t.Run("OwnerReferences", func(t *testing.T) { testOwnerReferences(t, opts) }) + + testListWatch(t, opts) +} + +func testRead(t *testing.T, opts TestOptions) { + ctx := testContext(t) + + for consistency, check := range map[storage.ReadConsistency]consistencyChecker{ + storage.EventualConsistency: eventually, + storage.StrongConsistency: immediately, + } { + t.Run(consistency.String(), func(t *testing.T) { + res := &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeAv1, + Tenancy: tenancyDefault, + Name: "web", + Uid: "a", + }, + } + + t.Run("simple", func(t *testing.T) { + backend := opts.NewBackend(t) + + _, err := backend.WriteCAS(ctx, res) + require.NoError(t, err) + + check(t, func(t testingT) { + output, err := backend.Read(ctx, consistency, res.Id) + require.NoError(t, err) + prototest.AssertDeepEqual(t, res, output, ignoreVersion) + }) + }) + + t.Run("no uid", func(t *testing.T) { + backend := opts.NewBackend(t) + + _, err := backend.WriteCAS(ctx, res) + require.NoError(t, err) + + id := clone(res.Id) + id.Uid = "" + + check(t, func(t testingT) { + output, err := backend.Read(ctx, consistency, id) + require.NoError(t, err) + prototest.AssertDeepEqual(t, res, output, ignoreVersion) + }) + }) + + t.Run("different id", func(t *testing.T) { + backend := opts.NewBackend(t) + + _, err := backend.WriteCAS(ctx, res) + require.NoError(t, err) + + id := clone(res.Id) + id.Name = "different" + + check(t, func(t testingT) { + _, err := backend.Read(ctx, consistency, id) + require.ErrorIs(t, err, storage.ErrNotFound) + }) + }) + + t.Run("different uid", func(t *testing.T) { + backend := opts.NewBackend(t) + + _, err := backend.WriteCAS(ctx, res) + require.NoError(t, err) + + id := clone(res.Id) + id.Uid = "b" + + check(t, func(t testingT) { + _, err := backend.Read(ctx, consistency, id) + require.ErrorIs(t, err, storage.ErrNotFound) + }) + }) + + t.Run("different GroupVersion", func(t *testing.T) { + backend := opts.NewBackend(t) + + _, err := backend.WriteCAS(ctx, res) + require.NoError(t, err) + + id := clone(res.Id) + id.Type = typeAv2 + + check(t, func(t testingT) { + _, err := backend.Read(ctx, consistency, id) + require.Error(t, err) + + var e storage.GroupVersionMismatchError + if errors.As(err, &e) { + require.Equal(t, id.Type, e.RequestedType) + prototest.AssertDeepEqual(t, res, e.Stored, ignoreVersion) + } else { + t.Fatalf("expected storage.GroupVersionMismatchError, got: %T", err) + } + }) + }) + }) + } + +} + +func testCASWrite(t *testing.T, opts TestOptions) { + t.Run("version-based CAS", func(t *testing.T) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + v1 := &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeB, + Tenancy: tenancyDefault, + Name: "web", + Uid: "a", + }, + } + + v1.Version = "some-version" + _, err := backend.WriteCAS(ctx, v1) + require.ErrorIs(t, err, storage.ErrCASFailure) + + v1.Version = "" + v1, err = backend.WriteCAS(ctx, v1) + require.NoError(t, err) + require.NotEmpty(t, v1.Version) + + v2, err := backend.WriteCAS(ctx, v1) + require.NoError(t, err) + require.NotEmpty(t, v2.Version) + require.NotEqual(t, v1.Version, v2.Version) + + v3 := clone(v2) + v3.Version = "" + _, err = backend.WriteCAS(ctx, v3) + require.ErrorIs(t, err, storage.ErrCASFailure) + + v3.Version = v1.Version + _, err = backend.WriteCAS(ctx, v3) + require.ErrorIs(t, err, storage.ErrCASFailure) + }) + + t.Run("uid immutability", func(t *testing.T) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + v1, err := backend.WriteCAS(ctx, &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeB, + Tenancy: tenancyDefault, + Name: "web", + Uid: "a", + }, + }) + require.NoError(t, err) + + // Uid cannot change. + v2 := clone(v1) + v2.Id.Uid = "" + _, err = backend.WriteCAS(ctx, v2) + require.Error(t, err) + + v2.Id.Uid = "b" + _, err = backend.WriteCAS(ctx, v2) + require.ErrorIs(t, err, storage.ErrWrongUid) + + v2.Id.Uid = v1.Id.Uid + v2, err = backend.WriteCAS(ctx, v2) + require.NoError(t, err) + + // Uid can change after original resource is deleted. + require.NoError(t, backend.DeleteCAS(ctx, v2.Id, v2.Version)) + + v3 := clone(v2) + v3.Id.Uid = "b" + v3.Version = "" + + _, err = backend.WriteCAS(ctx, v3) + require.NoError(t, err) + }) +} + +func testCASDelete(t *testing.T, opts TestOptions) { + t.Run("version-based CAS", func(t *testing.T) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + res, err := backend.WriteCAS(ctx, &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeB, + Tenancy: tenancyDefault, + Name: "web", + Uid: "a", + }, + }) + require.NoError(t, err) + + require.ErrorIs(t, backend.DeleteCAS(ctx, res.Id, ""), storage.ErrCASFailure) + require.ErrorIs(t, backend.DeleteCAS(ctx, res.Id, "some-version"), storage.ErrCASFailure) + + require.NoError(t, backend.DeleteCAS(ctx, res.Id, res.Version)) + + eventually(t, func(t testingT) { + _, err = backend.Read(ctx, storage.EventualConsistency, res.Id) + require.ErrorIs(t, err, storage.ErrNotFound) + }) + }) + + t.Run("uid must match", func(t *testing.T) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + res, err := backend.WriteCAS(ctx, &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeB, + Tenancy: tenancyDefault, + Name: "web", + Uid: "a", + }, + }) + require.NoError(t, err) + + id := clone(res.Id) + id.Uid = "b" + require.NoError(t, backend.DeleteCAS(ctx, id, res.Version)) + + eventually(t, func(t testingT) { + _, err = backend.Read(ctx, storage.EventualConsistency, res.Id) + require.NoError(t, err) + }) + }) +} + +func testListWatch(t *testing.T, opts TestOptions) { + testCases := map[string]struct { + resourceType storage.UnversionedType + tenancy *pbresource.Tenancy + namePrefix string + results []*pbresource.Resource + }{ + "simple #1": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: tenancyDefault, + namePrefix: "", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + seedData[2], + }, + }, + "simple #2": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: tenancyOther, + namePrefix: "", + results: []*pbresource.Resource{ + seedData[3], + }, + }, + "fixed tenancy, name prefix": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: tenancyDefault, + namePrefix: "a", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + }, + }, + "wildcard tenancy": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: &pbresource.Tenancy{ + Partition: storage.Wildcard, + PeerName: storage.Wildcard, + Namespace: storage.Wildcard, + }, + namePrefix: "", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + seedData[2], + seedData[3], + seedData[5], + seedData[6], + }, + }, + "fixed partition, wildcard peer, wildcard namespace": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: &pbresource.Tenancy{ + Partition: "default", + PeerName: storage.Wildcard, + Namespace: storage.Wildcard, + }, + namePrefix: "", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + seedData[2], + seedData[5], + seedData[6], + }, + }, + "wildcard partition, fixed peer, wildcard namespace": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: &pbresource.Tenancy{ + Partition: storage.Wildcard, + PeerName: "local", + Namespace: storage.Wildcard, + }, + namePrefix: "", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + seedData[2], + seedData[3], + seedData[5], + }, + }, + "wildcard partition, wildcard peer, fixed namespace": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: &pbresource.Tenancy{ + Partition: storage.Wildcard, + PeerName: storage.Wildcard, + Namespace: "default", + }, + namePrefix: "", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + seedData[2], + seedData[6], + }, + }, + "fixed partition, fixed peer, wildcard namespace": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: &pbresource.Tenancy{ + Partition: "default", + PeerName: "local", + Namespace: storage.Wildcard, + }, + namePrefix: "", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + seedData[2], + seedData[5], + }, + }, + "wildcard tenancy, name prefix": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: &pbresource.Tenancy{ + Partition: storage.Wildcard, + PeerName: storage.Wildcard, + Namespace: storage.Wildcard, + }, + namePrefix: "a", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + seedData[3], + seedData[5], + seedData[6], + }, + }, + } + + t.Run("List", func(t *testing.T) { + ctx := testContext(t) + + consistencyModes := map[storage.ReadConsistency]consistencyChecker{ + storage.EventualConsistency: eventually, + } + if opts.SupportsStronglyConsistentList { + consistencyModes[storage.StrongConsistency] = immediately + } + + for consistency, check := range consistencyModes { + t.Run(consistency.String(), func(t *testing.T) { + for desc, tc := range testCases { + t.Run(desc, func(t *testing.T) { + backend := opts.NewBackend(t) + for _, r := range seedData { + _, err := backend.WriteCAS(ctx, r) + require.NoError(t, err) + } + + check(t, func(t testingT) { + res, err := backend.List(ctx, consistency, tc.resourceType, tc.tenancy, tc.namePrefix) + require.NoError(t, err) + prototest.AssertElementsMatch(t, res, tc.results, ignoreVersion) + }) + }) + } + }) + } + }) + + t.Run("WatchList", func(t *testing.T) { + for desc, tc := range testCases { + t.Run(fmt.Sprintf("%s - initial snapshot", desc), func(t *testing.T) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + // Write the seed data before the watch has been established. + for _, r := range seedData { + _, err := backend.WriteCAS(ctx, r) + require.NoError(t, err) + } + + watch, err := backend.WatchList(ctx, tc.resourceType, tc.tenancy, tc.namePrefix) + require.NoError(t, err) + + for i := 0; i < len(tc.results); i++ { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + t.Cleanup(cancel) + + event, err := watch.Next(ctx) + require.NoError(t, err) + + require.Equal(t, pbresource.WatchEvent_OPERATION_UPSERT, event.Operation) + prototest.AssertContainsElement(t, tc.results, event.Resource, ignoreVersion) + } + }) + + t.Run(fmt.Sprintf("%s - following events", desc), func(t *testing.T) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + watch, err := backend.WatchList(ctx, tc.resourceType, tc.tenancy, tc.namePrefix) + require.NoError(t, err) + + // Write the seed data after the watch has been established. + for _, r := range seedData { + _, err := backend.WriteCAS(ctx, r) + require.NoError(t, err) + } + + for i := 0; i < len(tc.results); i++ { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + t.Cleanup(cancel) + + event, err := watch.Next(ctx) + require.NoError(t, err) + + require.Equal(t, pbresource.WatchEvent_OPERATION_UPSERT, event.Operation) + prototest.AssertContainsElement(t, tc.results, event.Resource, ignoreVersion) + + // Check that Read implements "monotonic reads" with Watch. + readRes, err := backend.Read(ctx, storage.EventualConsistency, event.Resource.Id) + require.NoError(t, err) + prototest.AssertDeepEqual(t, event.Resource, readRes) + } + + // Delete a random resource to check we get an event. + del, err := backend.Read(ctx, storage.EventualConsistency, tc.results[rand.Intn(len(tc.results))].Id) + require.NoError(t, err) + require.NoError(t, backend.DeleteCAS(ctx, del.Id, del.Version)) + + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + t.Cleanup(cancel) + + event, err := watch.Next(ctx) + require.NoError(t, err) + + require.Equal(t, pbresource.WatchEvent_OPERATION_DELETE, event.Operation) + prototest.AssertDeepEqual(t, del, event.Resource) + + // Check that Read implements "monotonic reads" with Watch. + _, err = backend.Read(ctx, storage.EventualConsistency, del.Id) + require.ErrorIs(t, err, storage.ErrNotFound) + }) + } + }) +} + +func testOwnerReferences(t *testing.T, opts TestOptions) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + owner, err := backend.WriteCAS(ctx, &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeAv1, + Tenancy: tenancyDefault, + Name: "owner", + Uid: "a", + }, + }) + require.NoError(t, err) + + r1, err := backend.WriteCAS(ctx, &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeB, + Tenancy: tenancyDefault, + Name: "r1", + Uid: "a", + }, + Owner: owner.Id, + }) + require.NoError(t, err) + + r2, err := backend.WriteCAS(ctx, &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeAv2, + Tenancy: tenancyDefault, + Name: "r2", + Uid: "a", + }, + Owner: owner.Id, + }) + require.NoError(t, err) + + eventually(t, func(t testingT) { + refs, err := backend.OwnerReferences(ctx, owner.Id) + require.NoError(t, err) + prototest.AssertElementsMatch(t, refs, []*pbresource.ID{r1.Id, r2.Id}) + }) + + t.Run("references are anchored to a specific uid", func(t *testing.T) { + id := clone(owner.Id) + id.Uid = "different" + + eventually(t, func(t testingT) { + refs, err := backend.OwnerReferences(ctx, id) + require.NoError(t, err) + require.Empty(t, refs) + }) + }) + + t.Run("deleting the owner doesn't remove the references", func(t *testing.T) { + require.NoError(t, backend.DeleteCAS(ctx, owner.Id, owner.Version)) + + eventually(t, func(t testingT) { + refs, err := backend.OwnerReferences(ctx, owner.Id) + require.NoError(t, err) + prototest.AssertElementsMatch(t, refs, []*pbresource.ID{r1.Id, r2.Id}) + }) + }) + + t.Run("deleting the owned resource removes its reference", func(t *testing.T) { + require.NoError(t, backend.DeleteCAS(ctx, r2.Id, r2.Version)) + + eventually(t, func(t testingT) { + refs, err := backend.OwnerReferences(ctx, owner.Id) + require.NoError(t, err) + prototest.AssertElementsMatch(t, refs, []*pbresource.ID{r1.Id}) + }) + }) +} + +var ( + typeAv1 = &pbresource.Type{ + Group: "test", + GroupVersion: "v1", + Kind: "a", + } + typeAv2 = &pbresource.Type{ + Group: "test", + GroupVersion: "v2", + Kind: "a", + } + typeB = &pbresource.Type{ + Group: "test", + GroupVersion: "v1", + Kind: "b", + } + tenancyDefault = &pbresource.Tenancy{ + Partition: "default", + PeerName: "local", + Namespace: "default", + } + + tenancyDefaultOtherNamespace = &pbresource.Tenancy{ + Partition: "default", + PeerName: "local", + Namespace: "other", + } + tenancyDefaultOtherPeer = &pbresource.Tenancy{ + Partition: "default", + PeerName: "remote", + Namespace: "default", + } + tenancyOther = &pbresource.Tenancy{ + Partition: "billing", + PeerName: "local", + Namespace: "payments", + } + + seedData = []*pbresource.Resource{ + resource(typeAv1, tenancyDefault, "admin"), // 0 + resource(typeAv1, tenancyDefault, "api"), // 1 + resource(typeAv2, tenancyDefault, "web"), // 2 + resource(typeAv1, tenancyOther, "api"), // 3 + resource(typeB, tenancyDefault, "admin"), // 4 + resource(typeAv1, tenancyDefaultOtherNamespace, "autoscaler"), // 5 + resource(typeAv1, tenancyDefaultOtherPeer, "amplifier"), // 6 + } + + ignoreVersion = protocmp.IgnoreFields(&pbresource.Resource{}, "version") +) + +func resource(typ *pbresource.Type, ten *pbresource.Tenancy, name string) *pbresource.Resource { + return &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typ, + Tenancy: ten, + Name: name, + Uid: "a", + }, + } +} + +func testContext(t *testing.T) context.Context { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + return ctx +} + +func clone[T proto.Message](v T) T { return proto.Clone(v).(T) } + +type testingT interface { + require.TestingT + prototest.TestingT +} + +type consistencyChecker func(t *testing.T, fn func(testingT)) + +func eventually(t *testing.T, fn func(testingT)) { + t.Helper() + retry.Run(t, func(r *retry.R) { fn(r) }) +} + +func immediately(t *testing.T, fn func(testingT)) { + t.Helper() + fn(t) +} diff --git a/internal/storage/inmem/backend.go b/internal/storage/inmem/backend.go new file mode 100644 index 0000000000..d14358aa15 --- /dev/null +++ b/internal/storage/inmem/backend.go @@ -0,0 +1,73 @@ +package inmem + +import ( + "context" + "strconv" + "sync/atomic" + + "google.golang.org/protobuf/proto" + + "github.com/hashicorp/consul/internal/storage" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +// NewBackend returns a purely in-memory storage backend. It's suitable for +// testing and development mode, but should NOT be used in production as it +// has no support for durable persistence, so all of your data will be lost +// when the process restarts or crashes. +// +// You must call Run before using the backend. +func NewBackend() (*Backend, error) { + store, err := NewStore() + if err != nil { + return nil, err + } + return &Backend{store: store}, nil +} + +// Backend is a purely in-memory storage backend implementation. +type Backend struct { + vsn uint64 + + store *Store +} + +// Run until the given context is canceled. This method blocks, so should be +// called in a goroutine. +func (b *Backend) Run(ctx context.Context) { b.store.Run(ctx) } + +// Read implements the storage.Backend interface. +func (b *Backend) Read(_ context.Context, _ storage.ReadConsistency, id *pbresource.ID) (*pbresource.Resource, error) { + return b.store.Read(id) +} + +// WriteCAS implements the storage.Backend interface. +func (b *Backend) WriteCAS(_ context.Context, res *pbresource.Resource) (*pbresource.Resource, error) { + stored := proto.Clone(res).(*pbresource.Resource) + stored.Version = strconv.Itoa(int(atomic.AddUint64(&b.vsn, 1))) + + if err := b.store.WriteCAS(stored, res.Version); err != nil { + return nil, err + } + return stored, nil +} + +// DeleteCAS implements the storage.Backend interface. +func (b *Backend) DeleteCAS(_ context.Context, id *pbresource.ID, version string) error { + return b.store.DeleteCAS(id, version) +} + +// List implements the storage.Backend interface. +func (b *Backend) List(_ context.Context, _ storage.ReadConsistency, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) { + return b.store.List(resType, tenancy, namePrefix) +} + +// WatchList implements the storage.Backend interface. +func (b *Backend) WatchList(_ context.Context, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) (storage.Watch, error) { + return b.store.WatchList(resType, tenancy, namePrefix) +} + +// OwnerReferences implements the storage.Backend interface. +func (b *Backend) OwnerReferences(_ context.Context, id *pbresource.ID) ([]*pbresource.ID, error) { + return b.store.OwnerReferences(id) +} diff --git a/internal/storage/inmem/backend_test.go b/internal/storage/inmem/backend_test.go new file mode 100644 index 0000000000..fcfeb896c9 --- /dev/null +++ b/internal/storage/inmem/backend_test.go @@ -0,0 +1,28 @@ +package inmem_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/internal/storage" + "github.com/hashicorp/consul/internal/storage/conformance" + "github.com/hashicorp/consul/internal/storage/inmem" +) + +func TestBackend_Conformance(t *testing.T) { + conformance.Test(t, conformance.TestOptions{ + NewBackend: func(t *testing.T) storage.Backend { + backend, err := inmem.NewBackend() + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + go backend.Run(ctx) + + return backend + }, + SupportsStronglyConsistentList: true, + }) +} diff --git a/internal/storage/inmem/event_index.go b/internal/storage/inmem/event_index.go new file mode 100644 index 0000000000..58aa05e1d2 --- /dev/null +++ b/internal/storage/inmem/event_index.go @@ -0,0 +1,33 @@ +package inmem + +import "github.com/hashicorp/go-memdb" + +type meta struct { + Key string + Value any +} + +func incrementEventIndex(tx *memdb.Txn) (uint64, error) { + idx, err := currentEventIndex(tx) + if err != nil { + return 0, err + } + + idx++ + if err := tx.Insert(tableNameMetadata, meta{Key: metaKeyEventIndex, Value: idx}); err != nil { + return 0, nil + } + return idx, nil +} + +func currentEventIndex(tx *memdb.Txn) (uint64, error) { + v, err := tx.First(tableNameMetadata, indexNameID, metaKeyEventIndex) + if err != nil { + return 0, err + } + if v == nil { + // 0 and 1 index are reserved for special use in the stream package. + return 2, nil + } + return v.(meta).Value.(uint64), nil +} diff --git a/internal/storage/inmem/schema.go b/internal/storage/inmem/schema.go new file mode 100644 index 0000000000..2f63fb8ce6 --- /dev/null +++ b/internal/storage/inmem/schema.go @@ -0,0 +1,247 @@ +package inmem + +import ( + "bytes" + "fmt" + "strings" + + "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/internal/storage" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +const ( + tableNameMetadata = "metadata" + tableNameResources = "resources" + + indexNameID = "id" + indexNameOwner = "owner" + + metaKeyEventIndex = "index" +) + +func newDB() (*memdb.MemDB, error) { + return memdb.NewMemDB(&memdb.DBSchema{ + Tables: map[string]*memdb.TableSchema{ + tableNameMetadata: { + Name: tableNameMetadata, + Indexes: map[string]*memdb.IndexSchema{ + indexNameID: { + Name: indexNameID, + AllowMissing: false, + Unique: true, + Indexer: &memdb.StringFieldIndex{Field: "Key"}, + }, + }, + }, + tableNameResources: { + Name: tableNameResources, + Indexes: map[string]*memdb.IndexSchema{ + indexNameID: { + Name: indexNameID, + AllowMissing: false, + Unique: true, + Indexer: idIndexer{}, + }, + indexNameOwner: { + Name: indexNameOwner, + AllowMissing: true, + Unique: false, + Indexer: ownerIndexer{}, + }, + }, + }, + }, + }) +} + +// indexSeparator delimits the segments of our radix tree keys. +const indexSeparator = "\x00" + +// idIndexer implements the memdb.Indexer, memdb.SingleIndexer and +// memdb.PrefixIndexer interfaces. It is used for indexing resources +// by their IDs. +type idIndexer struct{} + +// FromArgs constructs a radix tree key from an ID for lookup. +func (i idIndexer) FromArgs(args ...any) ([]byte, error) { + if l := len(args); l != 1 { + return nil, fmt.Errorf("expected 1 arg, got: %d", l) + } + id, ok := args[0].(*pbresource.ID) + if !ok { + return nil, fmt.Errorf("expected *pbresource.ID, got: %T", args[0]) + } + return indexFromID(id, false), nil +} + +// FromObject constructs a radix tree key from a Resource at write-time, or an +// ID at delete-time. +func (i idIndexer) FromObject(raw any) (bool, []byte, error) { + switch t := raw.(type) { + case *pbresource.ID: + return true, indexFromID(t, false), nil + case *pbresource.Resource: + return true, indexFromID(t.Id, false), nil + } + return false, nil, fmt.Errorf("expected *pbresource.Resource or *pbresource.ID, got: %T", raw) +} + +// PrefixFromArgs constructs a radix tree key prefix from a query for listing. +func (i idIndexer) PrefixFromArgs(args ...any) ([]byte, error) { + if l := len(args); l != 1 { + return nil, fmt.Errorf("expected 1 arg, got: %d", l) + } + + q, ok := args[0].(query) + if !ok { + return nil, fmt.Errorf("expected query, got: %T", args[0]) + } + return q.indexPrefix(), nil +} + +// ownerIndexer implements the memdb.Indexer and memdb.SingleIndexer interfaces. +// It is used for indexing resources by their owners. +type ownerIndexer struct{} + +// FromArgs constructs a radix tree key from an ID for lookup. +func (i ownerIndexer) FromArgs(args ...any) ([]byte, error) { + if l := len(args); l != 1 { + return nil, fmt.Errorf("expected 1 arg, got: %d", l) + } + id, ok := args[0].(*pbresource.ID) + if !ok { + return nil, fmt.Errorf("expected *pbresource.ID, got: %T", args[0]) + } + return indexFromID(id, true), nil +} + +// FromObject constructs a radix key tree from a Resource at write-time. +func (i ownerIndexer) FromObject(raw any) (bool, []byte, error) { + res, ok := raw.(*pbresource.Resource) + if !ok { + return false, nil, fmt.Errorf("expected *pbresource.Resource, got: %T", raw) + } + if res.Owner == nil { + return false, nil, nil + } + return true, indexFromID(res.Owner, true), nil +} + +func indexFromType(t storage.UnversionedType) []byte { + var b indexBuilder + b.String(t.Group) + b.String(t.Kind) + return b.Bytes() +} + +func indexFromTenancy(t *pbresource.Tenancy) []byte { + var b indexBuilder + b.String(t.Partition) + b.String(t.PeerName) + b.String(t.Namespace) + return b.Bytes() +} + +func indexFromID(id *pbresource.ID, includeUid bool) []byte { + var b indexBuilder + b.Raw(indexFromType(storage.UnversionedTypeFrom(id.Type))) + b.Raw(indexFromTenancy(id.Tenancy)) + b.String(id.Name) + if includeUid { + b.String(id.Uid) + } + return b.Bytes() +} + +type indexBuilder bytes.Buffer + +func (i *indexBuilder) Raw(v []byte) { + (*bytes.Buffer)(i).Write(v) +} + +func (i *indexBuilder) String(s string) { + (*bytes.Buffer)(i).WriteString(s) + (*bytes.Buffer)(i).WriteString(indexSeparator) +} + +func (i *indexBuilder) Bytes() []byte { + return (*bytes.Buffer)(i).Bytes() +} + +type query struct { + resourceType storage.UnversionedType + tenancy *pbresource.Tenancy + namePrefix string +} + +// indexPrefix is called by idIndexer.PrefixFromArgs to construct a radix tree +// key prefix for list queries. +// +// Our radix tree keys are structured like so: +// +// +// +// Where each segment is followed by a NULL terminator. +// +// In order to handle wildcard queries, we return a prefix up to the wildcarded +// field. For example: +// +// Query: type={mesh,v1,service}, partition=default, peer=*, namespace=default +// Prefix: mesh[NULL]v1[NULL]service[NULL]default[NULL] +// +// Which means that we must manually apply filters after the wildcard (i.e. +// namespace in the above example) in the matches method. +func (q query) indexPrefix() []byte { + var b indexBuilder + b.Raw(indexFromType(q.resourceType)) + + if v := q.tenancy.Partition; v == storage.Wildcard { + return b.Bytes() + } else { + b.String(v) + } + + if v := q.tenancy.PeerName; v == storage.Wildcard { + return b.Bytes() + } else { + b.String(v) + } + + if v := q.tenancy.Namespace; v == storage.Wildcard { + return b.Bytes() + } else { + b.String(v) + } + + if q.namePrefix != "" { + b.Raw([]byte(q.namePrefix)) + } + + return b.Bytes() +} + +// matches applies filters that couldn't be applied by just doing a radix tree +// prefix scan, because an earlier segment of the key prefix was wildcarded. +// +// See docs on query.indexPrefix for an example. +func (q query) matches(res *pbresource.Resource) bool { + if q.tenancy.Partition != storage.Wildcard && res.Id.Tenancy.Partition != q.tenancy.Partition { + return false + } + + if q.tenancy.PeerName != storage.Wildcard && res.Id.Tenancy.PeerName != q.tenancy.PeerName { + return false + } + + if q.tenancy.Namespace != storage.Wildcard && res.Id.Tenancy.Namespace != q.tenancy.Namespace { + return false + } + + if len(q.namePrefix) != 0 && !strings.HasPrefix(res.Id.Name, q.namePrefix) { + return false + } + + return true +} diff --git a/internal/storage/inmem/store.go b/internal/storage/inmem/store.go new file mode 100644 index 0000000000..fc769a6e07 --- /dev/null +++ b/internal/storage/inmem/store.go @@ -0,0 +1,274 @@ +package inmem + +import ( + "context" + "sync" + "time" + + "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/internal/storage" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +// Store implements an in-memory resource database using go-memdb. +// +// It can be used as a storage backend directly via the Backend type in this +// package, but also handles reads in our Raft backend, and can be used as a +// local cache when storing data in external systems (e.g. RDBMS, K/V stores). +type Store struct { + db *memdb.MemDB + pub *stream.EventPublisher + + // eventLock is used to serialize operations that result in the publishing of + // events (i.e. writes and deletes) to ensure correct ordering when there are + // concurrent writers. + // + // We cannot rely on MemDB's write lock for this, because events must be + // published *after* the transaction is committed to provide monotonic reads + // between Watch and Read calls. In other words, if we were to publish an event + // before the transaction was committed, there would be a small window of time + // where a watcher (e.g. controller) could try to Read the resource and not get + // the version they were notified about. + // + // Without this lock, it would be possible to publish events out-of-order. + eventLock sync.Mutex +} + +// NewStore creates a Store. +// +// You must call Run before using the store. +func NewStore() (*Store, error) { + db, err := newDB() + if err != nil { + return nil, err + } + + s := &Store{ + db: db, + pub: stream.NewEventPublisher(10 * time.Second), + } + s.pub.RegisterHandler(eventTopic, s.watchSnapshot, false) + + return s, nil +} + +// Run until the given context is canceled. This method blocks, so should be +// called in a goroutine. +func (s *Store) Run(ctx context.Context) { s.pub.Run(ctx) } + +// Read a resource using its ID. +// +// For more information, see the storage.Backend documentation. +func (s *Store) Read(id *pbresource.ID) (*pbresource.Resource, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + val, err := tx.First(tableNameResources, indexNameID, id) + if err != nil { + return nil, err + } + if val == nil { + return nil, storage.ErrNotFound + } + + res := val.(*pbresource.Resource) + + // Observe the Uid if it was given. + if id.Uid != "" && res.Id.Uid != id.Uid { + return nil, storage.ErrNotFound + } + + // Let the caller know they need to upgrade/downgrade the schema version. + if id.Type.GroupVersion != res.Id.Type.GroupVersion { + return nil, storage.GroupVersionMismatchError{ + RequestedType: id.Type, + Stored: res, + } + } + + return res, nil +} + +// WriteCAS performs an atomic Compare-And-Swap (CAS) write of a resource. +// +// For more information, see the storage.Backend documentation. +func (s *Store) WriteCAS(res *pbresource.Resource, vsn string) error { + s.eventLock.Lock() + defer s.eventLock.Unlock() + + tx := s.db.Txn(true) + defer tx.Abort() + + existing, err := tx.First(tableNameResources, indexNameID, res.Id) + if err != nil { + return err + } + + // Callers provide an empty version string on initial resource creation. + if existing == nil && vsn != "" { + return storage.ErrCASFailure + } + + if existing != nil { + existingRes := existing.(*pbresource.Resource) + + // Ensure CAS semantics. + if existingRes.Version != vsn { + return storage.ErrCASFailure + } + + // Uid is immutable. + if existingRes.Id.Uid != res.Id.Uid { + return storage.ErrWrongUid + } + } + + if err := tx.Insert(tableNameResources, res); err != nil { + return err + } + + idx, err := incrementEventIndex(tx) + if err != nil { + return nil + } + tx.Commit() + + s.publishEvent(idx, pbresource.WatchEvent_OPERATION_UPSERT, res) + + return nil +} + +// DeleteCAS performs an atomic Compare-And-Swap (CAS) deletion of a resource. +// +// For more information, see the storage.Backend documentation. +func (s *Store) DeleteCAS(id *pbresource.ID, vsn string) error { + s.eventLock.Lock() + defer s.eventLock.Unlock() + + tx := s.db.Txn(true) + defer tx.Abort() + + existing, err := tx.First(tableNameResources, indexNameID, id) + if err != nil { + return err + } + + // Deleting an already deleted resource is a no-op. + if existing == nil { + return nil + } + + res := existing.(*pbresource.Resource) + + // Deleting a resource using a previous Uid is a no-op. + if id.Uid != res.Id.Uid { + return nil + } + + // Ensure CAS semantics. + if vsn != res.Version { + return storage.ErrCASFailure + } + + if err := tx.Delete(tableNameResources, id); err != nil { + return err + } + + idx, err := incrementEventIndex(tx) + if err != nil { + return nil + } + tx.Commit() + + s.publishEvent(idx, pbresource.WatchEvent_OPERATION_DELETE, res) + + return nil +} + +// List resources of the given type, tenancy, and optionally matching the given +// name prefix. +// +// For more information, see the storage.Backend documentation. +func (s *Store) List(typ storage.UnversionedType, ten *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + return listTxn(tx, query{typ, ten, namePrefix}) +} + +func listTxn(tx *memdb.Txn, q query) ([]*pbresource.Resource, error) { + iter, err := tx.Get(tableNameResources, indexNameID+"_prefix", q) + if err != nil { + return nil, err + } + + list := make([]*pbresource.Resource, 0) + for v := iter.Next(); v != nil; v = iter.Next() { + res := v.(*pbresource.Resource) + + if q.matches(res) { + list = append(list, res) + } + } + return list, nil +} + +// WatchList watches resources of the given type, tenancy, and optionally +// matching the given name prefix. +// +// For more information, see the storage.Backend documentation. +func (s *Store) WatchList(typ storage.UnversionedType, ten *pbresource.Tenancy, namePrefix string) (*Watch, error) { + // If the user specifies a wildcard, we subscribe to events for resources in + // all partitions, peers, and namespaces, and manually filter out irrelevant + // stuff (in Watch.Next). + // + // If the user gave exact tenancy values, we can subscribe to events for the + // relevant resources only, which is far more efficient. + var sub stream.Subject + if ten.Partition == storage.Wildcard || + ten.PeerName == storage.Wildcard || + ten.Namespace == storage.Wildcard { + sub = wildcardSubject{typ} + } else { + sub = tenancySubject{typ, ten} + } + + ss, err := s.pub.Subscribe(&stream.SubscribeRequest{ + Topic: eventTopic, + Subject: sub, + }) + if err != nil { + return nil, err + } + + return &Watch{ + sub: ss, + query: query{ + resourceType: typ, + tenancy: ten, + namePrefix: namePrefix, + }, + }, nil +} + +// OwnerReferences returns the IDs of resources owned by the resource with the +// given ID. +// +// For more information, see the storage.Backend documentation. +func (s *Store) OwnerReferences(id *pbresource.ID) ([]*pbresource.ID, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + iter, err := tx.Get(tableNameResources, indexNameOwner, id) + if err != nil { + return nil, err + } + + var refs []*pbresource.ID + for v := iter.Next(); v != nil; v = iter.Next() { + refs = append(refs, v.(*pbresource.Resource).Id) + } + return refs, nil +} diff --git a/internal/storage/inmem/watch.go b/internal/storage/inmem/watch.go new file mode 100644 index 0000000000..5356ea7a34 --- /dev/null +++ b/internal/storage/inmem/watch.go @@ -0,0 +1,189 @@ +package inmem + +import ( + "context" + "fmt" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/internal/storage" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/proto/private/pbsubscribe" +) + +// Watch implements the storage.Watch interface using a stream.Subscription. +type Watch struct { + sub *stream.Subscription + query query + + // events holds excess events when they are bundled in a stream.PayloadEvents, + // until Next is called again. + events []stream.Event +} + +// Next returns the next WatchEvent, blocking until one is available. +func (w *Watch) Next(ctx context.Context) (*pbresource.WatchEvent, error) { + for { + e, err := w.nextEvent(ctx) + if err != nil { + return nil, err + } + + event := e.Payload.(eventPayload).event + if w.query.matches(event.Resource) { + return event, nil + } + } +} + +func (w *Watch) nextEvent(ctx context.Context) (*stream.Event, error) { + if len(w.events) != 0 { + event := w.events[0] + w.events = w.events[1:] + return &event, nil + } + + for { + e, err := w.sub.Next(ctx) + if err != nil { + return nil, err + } + + if e.IsFramingEvent() { + continue + } + + switch t := e.Payload.(type) { + case eventPayload: + return &e, nil + case *stream.PayloadEvents: + if len(t.Items) == 0 { + continue + } + + event, rest := t.Items[0], t.Items[1:] + w.events = rest + return &event, nil + } + } +} + +var eventTopic = stream.StringTopic("resources") + +type eventPayload struct { + subject stream.Subject + event *pbresource.WatchEvent +} + +func (p eventPayload) Subject() stream.Subject { return p.subject } + +// These methods are required by the stream.Payload interface, but we don't use them. +func (eventPayload) HasReadPermission(acl.Authorizer) bool { return false } +func (eventPayload) ToSubscriptionEvent(uint64) *pbsubscribe.Event { return nil } + +type wildcardSubject struct { + resourceType storage.UnversionedType +} + +func (s wildcardSubject) String() string { + return s.resourceType.Group + indexSeparator + + s.resourceType.Kind + indexSeparator + + storage.Wildcard +} + +type tenancySubject struct { + resourceType storage.UnversionedType + tenancy *pbresource.Tenancy +} + +func (s tenancySubject) String() string { + return s.resourceType.Group + indexSeparator + + s.resourceType.Kind + indexSeparator + + s.tenancy.Partition + indexSeparator + + s.tenancy.PeerName + indexSeparator + + s.tenancy.Namespace +} + +// publishEvent sends the event to the relevant Watches. +func (s *Store) publishEvent(idx uint64, op pbresource.WatchEvent_Operation, res *pbresource.Resource) { + id := res.Id + resourceType := storage.UnversionedTypeFrom(id.Type) + event := &pbresource.WatchEvent{Operation: op, Resource: res} + + // We publish two copies of the event: one to the tenancy-specific subject and + // another to a wildcard subject. Ideally, we'd be able to put the type in the + // topic instead and use stream.SubjectWildcard, but this requires knowing all + // types up-front (to register the snapshot handlers). + s.pub.Publish([]stream.Event{ + { + Topic: eventTopic, + Index: idx, + Payload: eventPayload{ + subject: wildcardSubject{resourceType}, + event: event, + }, + }, + { + Topic: eventTopic, + Index: idx, + Payload: eventPayload{ + subject: tenancySubject{ + resourceType: resourceType, + tenancy: id.Tenancy, + }, + event: event, + }, + }, + }) +} + +// watchSnapshot implements a stream.SnapshotFunc to provide upsert events for +// the initial state of the world. +func (s *Store) watchSnapshot(req stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) { + var q query + switch t := req.Subject.(type) { + case tenancySubject: + q.resourceType = t.resourceType + q.tenancy = t.tenancy + case wildcardSubject: + q.resourceType = t.resourceType + q.tenancy = &pbresource.Tenancy{ + Partition: storage.Wildcard, + PeerName: storage.Wildcard, + Namespace: storage.Wildcard, + } + default: + return 0, fmt.Errorf("unhandled subject type: %T", req.Subject) + } + + tx := s.db.Txn(false) + defer tx.Abort() + + idx, err := currentEventIndex(tx) + if err != nil { + return 0, err + } + + results, err := listTxn(tx, q) + if err != nil { + return 0, nil + } + + events := make([]stream.Event, len(results)) + for i, r := range results { + events[i] = stream.Event{ + Topic: eventTopic, + Index: idx, + Payload: eventPayload{ + subject: req.Subject, + event: &pbresource.WatchEvent{ + Operation: pbresource.WatchEvent_OPERATION_UPSERT, + Resource: r, + }, + }, + } + } + snap.Append(events) + + return idx, nil +} diff --git a/internal/storage/storage.go b/internal/storage/storage.go new file mode 100644 index 0000000000..3ae33d6d42 --- /dev/null +++ b/internal/storage/storage.go @@ -0,0 +1,307 @@ +package storage + +import ( + "context" + "errors" + "fmt" + + "github.com/hashicorp/consul/proto-public/pbresource" +) + +// Wildcard can be given as Tenancy fields in List and Watch calls, to enumerate +// resources across multiple partitions, peers, namespaces, etc. +const Wildcard = "*" + +var ( + // ErrNotFound indicates that the resource could not be found. + ErrNotFound = errors.New("resource not found") + + // ErrCASFailure indicates that the attempted write failed because the given + // version does not match what is currently stored. + ErrCASFailure = errors.New("CAS operation failed because the given version doesn't match what is stored") + + // ErrWrongUid indicates that the attempted write failed because the resource's + // Uid doesn't match what is currently stored (e.g. the caller is trying to + // operate on a deleted resource with the same name). + ErrWrongUid = errors.New("write failed because the given uid doesn't match what is stored") + + // ErrInconsistent indicates that the attempted write or consistent read could + // not be achieved because of a consistency or availability issue (e.g. loss of + // quorum, or when interacting with a Raft follower). + ErrInconsistent = errors.New("cannot satisfy consistency requirements") +) + +// ReadConsistency is used to specify the required consistency guarantees for +// a read operation. +type ReadConsistency int + +const ( + // EventualConsistency provides a weak set of guarantees, but is much cheaper + // than using StrongConsistency and therefore should be treated as the default. + // + // It guarantees [monotonic reads]. That is, a read will always return results + // that are as up-to-date as an earlier read, provided both happen on the same + // Consul server. But does not make any such guarantee about writes. + // + // In other words, reads won't necessarily reflect earlier writes, even when + // made against the same server. + // + // Operations that don't allow the caller to specify the consistency mode will + // hold the same guarantees as EventualConsistency, but check the method docs + // for caveats. + // + // [monotonic reads]: https://jepsen.io/consistency/models/monotonic-reads + EventualConsistency ReadConsistency = iota + + // StrongConsistency provides a very strong set of guarantees but is much more + // expensive, so should be used sparingly. + // + // It guarantees full [linearizability], such that a read will always return + // the most up-to-date version of a resource, without caveat. + // + // [linearizability]: https://jepsen.io/consistency/models/linearizable + StrongConsistency +) + +// String implements the fmt.Stringer interface. +func (c ReadConsistency) String() string { + switch c { + case EventualConsistency: + return "Eventual Consistency" + case StrongConsistency: + return "Strong Consistency" + } + panic(fmt.Sprintf("unknown ReadConsistency (%d)", c)) +} + +// Backend provides the low-level storage substrate for resources. It can be +// implemented using internal (i.e. Raft+MemDB) or external (e.g. DynamoDB) +// storage systems. +// +// Refer to the method comments for details of the behaviors and invariants +// provided, which are also verified by the conformance test suite in the +// internal/storage/conformance package. +// +// Cross-cutting concerns: +// +// # UIDs +// +// Users identify resources with a name of their choice (e.g. service "billing") +// but internally, we add our own identifier in the Uid field to disambiguate +// references when resources are deleted and re-created with the same name. +// +// # GroupVersion +// +// In order to support automatic translation between schema versions, we only +// store a single version of a resource, and treat types with the same Group +// and Kind, but different GroupVersions, as equivalent. +// +// # Read-Modify-Write Patterns +// +// All writes at the storage backend level are CAS (Compare-And-Swap) operations +// where the caller must provide the resource in its entirety, with the current +// version string. +// +// Non-CAS writes should be implemented at a higher level (i.e. in the Resource +// Service) by reading the resource, applying the user's requested modifications, +// and writing it back. This allows us to ensure we're correctly carrying over +// the resource's Status and Uid, without requiring support for partial update +// or "patch" operations from external storage systems. +// +// In cases where there are concurrent interleaving writes made to a resource, +// it's likely that a CAS operation will fail, so callers may need to put their +// Read-Modify-Write cycle in a retry loop. +type Backend interface { + // Read a resource using its ID. + // + // # UIDs + // + // If id.Uid is empty, Read will ignore it and return whatever resource is + // stored with the given name. This is the desired behavior for user-initiated + // reads. + // + // If id.Uid is non-empty, Read will only return a resource if its Uid matches, + // otherwise it'll return ErrNotFound. This is the desired behaviour for reads + // initiated by controllers, which tend to operate on a specific lifetime of a + // resource. + // + // See Backend docs for more details. + // + // # GroupVersion + // + // If id.Type.GroupVersion doesn't match what is stored, Read will return a + // GroupVersionMismatchError which contains a pointer to the stored resource. + // + // See Backend docs for more details. + // + // # Consistency + // + // Read supports both EventualConsistency and StrongConsistency. + Read(ctx context.Context, consistency ReadConsistency, id *pbresource.ID) (*pbresource.Resource, error) + + // WriteCAS performs an atomic CAS (Compare-And-Swap) write of a resource based + // on its version. The given version will be compared to what is stored, and if + // it does not match, ErrCASFailure will be returned. To create new resources, + // set version to an empty string. + // + // If a write cannot be performed because of a consistency or availability + // issue (e.g. when interacting with a Raft follower, or when quorum is lost) + // ErrInconsistent will be returned. + // + // # UIDs + // + // UIDs are immutable, so if the given resource's Uid field doesn't match what + // is stored, ErrWrongUid will be returned. + // + // See Backend docs for more details. + // + // # GroupVersion + // + // Write does not validate the GroupVersion and allows you to overwrite a + // resource stored in an older form with a newer, and vice versa. + // + // See Backend docs for more details. + WriteCAS(ctx context.Context, res *pbresource.Resource) (*pbresource.Resource, error) + + // DeleteCAS performs an atomic CAS (Compare-And-Swap) deletion of a resource + // based on its version. The given version will be compared to what is stored, + // and if it does not match, ErrCASFailure will be returned. + // + // If the resource does not exist (i.e. has already been deleted) no error will + // be returned. + // + // If a deletion cannot be performed because of a consistency or availability + // issue (e.g. when interacting with a Raft follower, or when quorum is lost) + // ErrInconsistent will be returned. + // + // # UIDs + // + // If the given id's Uid does not match what is stored, the deletion will be a + // no-op (i.e. it is considered to be a different resource). + // + // See Backend docs for more details. + // + // # GroupVersion + // + // Delete does not check or refer to the GroupVersion. Resources of the same + // Group and Kind are considered equivalent, so requests to delete a resource + // using a new GroupVersion will delete a resource even if it's stored with an + // old GroupVersion. + // + // See Backend docs for more details. + DeleteCAS(ctx context.Context, id *pbresource.ID, version string) error + + // List resources of the given type, tenancy, and optionally matching the given + // name prefix. + // + // # Tenancy Wildcard + // + // In order to list resources across multiple tenancy units (e.g. namespaces) + // pass the Wildcard sentinel value in tenancy fields. + // + // # GroupVersion + // + // The resType argument contains only the Group and Kind, to reflect the fact + // that resources may be stored in a mix of old and new forms. As such, it's + // the caller's responsibility to check the resource's GroupVersion and + // translate or filter accordingly. + // + // # Consistency + // + // Generally, List only supports EventualConsistency. However, for backward + // compatability with our v1 APIs, the Raft backend supports StrongConsistency + // for list operations. + // + // When the v1 APIs finally goes away, so will this consistency parameter, so + // it should not be depended on outside of the backward compatability layer. + List(ctx context.Context, consistency ReadConsistency, resType UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) + + // WatchList watches resources of the given type, tenancy, and optionally + // matching the given name prefix. Upsert events for the current state of the + // world (i.e. existing resources that match the given filters) will be emitted + // immediately, and will be followed by delta events whenever resources are + // written or deleted. + // + // # Consistency + // + // WatchList makes no guarantees about event timeliness (e.g. an event for a + // write may not be received immediately), but it does guarantee that events + // will be emitted in the correct order. + // + // There's also a guarantee of [monotonic reads] between Read and WatchList, + // such that Read will never return data that is older than the most recent + // event you received. Note: this guarantee holds at the (in-process) storage + // backend level, only. Controllers and other users of the Resource Service API + // must remain connected to the same Consul server process to avoid receiving + // events about writes that they then cannot read. In other words, it is *not* + // linearizable. + // + // There's a similar guarantee between WatchList and OwnerReferences, see the + // OwnerReferences docs for more information. + // + // See List docs for details about Tenancy Wildcard and GroupVersion. + // + // [monotonic reads]: https://jepsen.io/consistency/models/monotonic-reads + WatchList(ctx context.Context, resType UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) (Watch, error) + + // OwnerReferences returns the IDs of resources owned by the resource with the + // given ID. It is typically used to implement cascading deletion. + // + // # Consistency + // + // OwnerReferences may return stale results, but guarnantees [monotonic reads] + // with events received from WatchList. In practice, this means that if you + // learn that a resource has been deleted through a watch event, the results + // you receive from OwnerReferences will contain all references that existed + // at the time the owner was deleted. It doesn't make any guarantees about + // references that are created *after* the owner was deleted, though, so you + // must either prevent that from happening (e.g. by performing a consistent + // read of the owner in the write-path, which has its own ordering/correctness + // challenges), or by calling OwnerReferences after the expected window of + // inconsistency (e.g. deferring cascading deletion, or doing a second pass + // an hour later). + // + // [montonic reads]: https://jepsen.io/consistency/models/monotonic-reads + OwnerReferences(ctx context.Context, id *pbresource.ID) ([]*pbresource.ID, error) +} + +// Watch represents a watch on a given set of resources. Call Next to get the +// next event (i.e. upsert or deletion). +type Watch interface { + // Next returns the next event (i.e. upsert or deletion) + Next(ctx context.Context) (*pbresource.WatchEvent, error) +} + +// UnversionedType represents a pbresource.Type as it is stored without the +// GroupVersion. +type UnversionedType struct { + Group string + Kind string +} + +// UnversionedTypeFrom creates an UnversionedType from the given *pbresource.Type. +func UnversionedTypeFrom(t *pbresource.Type) UnversionedType { + return UnversionedType{ + Group: t.Group, + Kind: t.Kind, + } +} + +// GroupVersionMismatchError is returned when a resource is stored as a type +// with a different GroupVersion than was requested. +type GroupVersionMismatchError struct { + // RequestedType is the type that was requested. + RequestedType *pbresource.Type + + // Stored is the resource as it is stored. + Stored *pbresource.Resource +} + +// Error implements the error interface. +func (e GroupVersionMismatchError) Error() string { + return fmt.Sprintf( + "resource was requested with GroupVersion=%q, but stored with GroupVersion=%q", + e.RequestedType.GroupVersion, + e.Stored.Id.Type.GroupVersion, + ) +} diff --git a/proto/private/prototest/testing.go b/proto/private/prototest/testing.go index bf25fb0a10..1baafa2c61 100644 --- a/proto/private/prototest/testing.go +++ b/proto/private/prototest/testing.go @@ -1,13 +1,16 @@ package prototest import ( - "testing" - "github.com/google/go-cmp/cmp" "google.golang.org/protobuf/testing/protocmp" ) -func AssertDeepEqual(t testing.TB, x, y interface{}, opts ...cmp.Option) { +type TestingT interface { + Helper() + Fatalf(string, ...any) +} + +func AssertDeepEqual(t TestingT, x, y interface{}, opts ...cmp.Option) { t.Helper() opts = append(opts, protocmp.Transform()) @@ -24,7 +27,7 @@ func AssertDeepEqual(t testing.TB, x, y interface{}, opts ...cmp.Option) { // // prototest.AssertElementsMatch(t, [1, 3, 2, 3], [1, 3, 3, 2]) func AssertElementsMatch[V any]( - t testing.TB, listX, listY []V, opts ...cmp.Option, + t TestingT, listX, listY []V, opts ...cmp.Option, ) { t.Helper() @@ -73,3 +76,17 @@ func AssertElementsMatch[V any]( t.Fatalf("assertion failed: slices do not have matching elements\n--- expected\n+++ actual\n%v", diff) } } + +func AssertContainsElement[V any](t TestingT, list []V, element V, opts ...cmp.Option) { + t.Helper() + + opts = append(opts, protocmp.Transform()) + + for _, e := range list { + if cmp.Equal(e, element, opts...) { + return + } + } + + t.Fatalf("assertion failed: list does not contain element\n--- list\n%#v\n--- element: %#v", list, element) +}