R.B. Boyer c029b20615
v2: ensure the controller caches are fully populated before first use (#20421)
The new controller caches are initialized before the DependencyMappers or the 
Reconciler run, but importantly they are not populated. The expectation is that 
when the WatchList call is made to the resource service it will send an initial 
snapshot of all resources matching a single type, and then perpetually send 
UPSERT/DELETE events afterward. This initial snapshot will cycle through the 
caching layer and will catch it up to reflect the stored data.

Critically the dependency mappers and reconcilers will race against the restoration 
of the caches on server startup or leader election. During this time it is possible a
 mapper or reconciler will use the cache to lookup a specific relationship and 
not find it. That very same reconciler may choose to then recompute some 
persisted resource and in effect rewind it to a prior computed state.

Change

- Since we are updating the behavior of the WatchList RPC, it was aligned to 
  match that of pbsubscribe and pbpeerstream using a protobuf oneof instead of the enum+fields option.

- The WatchList rpc now has 3 alternating response events: Upsert, Delete, 
  EndOfSnapshot. When set the initial batch of "snapshot" Upserts sent on a new 
  watch, those operations will be followed by an EndOfSnapshot event before beginning 
  the never-ending sequence of Upsert/Delete events.

- Within the Controller startup code we will launch N+1 goroutines to execute WatchList 
  queries for the watched types. The UPSERTs will be applied to the nascent cache
   only (no mappers will execute).

- Upon witnessing the END operation, those goroutines will terminate.

- When all cache priming routines complete, then the normal set of N+1 long lived 
watch routines will launch to officially witness all events in the system using the 
primed cached.
2024-02-02 15:11:05 -06:00

299 lines
7.3 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package inmem
import (
"context"
"sync"
"time"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/internal/storage"
"github.com/hashicorp/consul/proto-public/pbresource"
)
// Store implements an in-memory resource database using go-memdb.
//
// It can be used as a storage backend directly via the Backend type in this
// package, but also handles reads in our Raft backend, and can be used as a
// local cache when storing data in external systems (e.g. RDBMS, K/V stores).
type Store struct {
mu sync.RWMutex // guards db, because Restore.Commit will replace it wholesale.
db *memdb.MemDB
pub *stream.EventPublisher
// eventLock is used to serialize operations that result in the publishing of
// events (i.e. writes and deletes) to ensure correct ordering when there are
// concurrent writers.
//
// We cannot rely on MemDB's write lock for this, because events must be
// published *after* the transaction is committed to provide monotonic reads
// between Watch and Read calls. In other words, if we were to publish an event
// before the transaction was committed, there would be a small window of time
// where a watcher (e.g. controller) could try to Read the resource and not get
// the version they were notified about.
//
// Without this lock, it would be possible to publish events out-of-order.
eventLock sync.Mutex
}
// NewStore creates a Store.
//
// You must call Run before using the store.
func NewStore() (*Store, error) {
db, err := newDB()
if err != nil {
return nil, err
}
s := &Store{
db: db,
pub: stream.NewEventPublisher(10 * time.Second),
}
s.pub.RegisterHandler(eventTopic, s.watchSnapshot, false)
return s, nil
}
// Run until the given context is canceled. This method blocks, so should be
// called in a goroutine.
func (s *Store) Run(ctx context.Context) { s.pub.Run(ctx) }
// Read a resource using its ID.
//
// For more information, see the storage.Backend documentation.
func (s *Store) Read(id *pbresource.ID) (*pbresource.Resource, error) {
tx := s.txn(false)
defer tx.Abort()
val, err := tx.First(tableNameResources, indexNameID, id)
if err != nil {
return nil, err
}
if val == nil {
return nil, storage.ErrNotFound
}
res := val.(*pbresource.Resource)
// Observe the Uid if it was given.
if id.Uid != "" && res.Id.Uid != id.Uid {
return nil, storage.ErrNotFound
}
// Let the caller know they need to upgrade/downgrade the schema version.
if id.Type.GroupVersion != res.Id.Type.GroupVersion {
return nil, storage.GroupVersionMismatchError{
RequestedType: id.Type,
Stored: res,
}
}
return res, nil
}
// WriteCAS performs an atomic Compare-And-Swap (CAS) write of a resource.
//
// For more information, see the storage.Backend documentation.
func (s *Store) WriteCAS(res *pbresource.Resource, vsn string) error {
s.eventLock.Lock()
defer s.eventLock.Unlock()
tx := s.txn(true)
defer tx.Abort()
existing, err := tx.First(tableNameResources, indexNameID, res.Id)
if err != nil {
return err
}
// Callers provide an empty version string on initial resource creation.
if existing == nil && vsn != "" {
return storage.ErrCASFailure
}
if existing != nil {
existingRes := existing.(*pbresource.Resource)
// Uid is immutable.
if existingRes.Id.Uid != res.Id.Uid {
return storage.ErrWrongUid
}
// Ensure CAS semantics.
if existingRes.Version != vsn {
return storage.ErrCASFailure
}
}
if err := tx.Insert(tableNameResources, res); err != nil {
return err
}
idx, err := incrementEventIndex(tx)
if err != nil {
return nil
}
tx.Commit()
s.publishEvent(idx, &pbresource.WatchEvent{
Event: &pbresource.WatchEvent_Upsert_{
Upsert: &pbresource.WatchEvent_Upsert{
Resource: res,
},
},
})
return nil
}
// DeleteCAS performs an atomic Compare-And-Swap (CAS) deletion of a resource.
//
// For more information, see the storage.Backend documentation.
func (s *Store) DeleteCAS(id *pbresource.ID, vsn string) error {
s.eventLock.Lock()
defer s.eventLock.Unlock()
tx := s.txn(true)
defer tx.Abort()
existing, err := tx.First(tableNameResources, indexNameID, id)
if err != nil {
return err
}
// Deleting an already deleted resource is a no-op.
if existing == nil {
return nil
}
res := existing.(*pbresource.Resource)
// Deleting a resource using a previous Uid is a no-op.
if id.Uid != res.Id.Uid {
return nil
}
// Ensure CAS semantics.
if vsn != res.Version {
return storage.ErrCASFailure
}
if err := tx.Delete(tableNameResources, id); err != nil {
return err
}
idx, err := incrementEventIndex(tx)
if err != nil {
return nil
}
tx.Commit()
s.publishEvent(idx, &pbresource.WatchEvent{
Event: &pbresource.WatchEvent_Delete_{
Delete: &pbresource.WatchEvent_Delete{
Resource: res,
},
},
})
return nil
}
// List resources of the given type, tenancy, and optionally matching the given
// name prefix.
//
// For more information, see the storage.Backend documentation.
func (s *Store) List(typ storage.UnversionedType, ten *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) {
tx := s.txn(false)
defer tx.Abort()
return listTxn(tx, query{typ, ten, namePrefix})
}
func listTxn(tx *memdb.Txn, q query) ([]*pbresource.Resource, error) {
iter, err := tx.Get(tableNameResources, indexNameID+"_prefix", q)
if err != nil {
return nil, err
}
list := make([]*pbresource.Resource, 0)
for v := iter.Next(); v != nil; v = iter.Next() {
res := v.(*pbresource.Resource)
if q.matches(res) {
list = append(list, res)
}
}
return list, nil
}
// WatchList watches resources of the given type, tenancy, and optionally
// matching the given name prefix.
//
// For more information, see the storage.Backend documentation.
func (s *Store) WatchList(typ storage.UnversionedType, ten *pbresource.Tenancy, namePrefix string) (*Watch, error) {
// If the user specifies a wildcard, we subscribe to events for resources in
// all partitions, peers, and namespaces, and manually filter out irrelevant
// stuff (in Watch.Next).
//
// If the user gave exact tenancy values, we can subscribe to events for the
// relevant resources only, which is far more efficient.
var sub stream.Subject
if ten.Partition == storage.Wildcard ||
ten.Namespace == storage.Wildcard {
// TODO(peering/v2) update conditional to handle peer tenancy wildcards
sub = wildcardSubject{typ}
} else {
sub = tenancySubject{typ, ten}
}
ss, err := s.pub.Subscribe(&stream.SubscribeRequest{
Topic: eventTopic,
Subject: sub,
})
if err != nil {
return nil, err
}
return &Watch{
sub: ss,
query: query{
resourceType: typ,
tenancy: ten,
namePrefix: namePrefix,
},
}, nil
}
// ListByOwner returns resources owned by the resource with the given ID.
//
// For more information, see the storage.Backend documentation.
func (s *Store) ListByOwner(id *pbresource.ID) ([]*pbresource.Resource, error) {
tx := s.txn(false)
defer tx.Abort()
iter, err := tx.Get(tableNameResources, indexNameOwner, id)
if err != nil {
return nil, err
}
var res []*pbresource.Resource
for v := iter.Next(); v != nil; v = iter.Next() {
res = append(res, v.(*pbresource.Resource))
}
return res, nil
}
func (s *Store) txn(write bool) *memdb.Txn {
s.mu.RLock()
defer s.mu.RUnlock()
return s.db.Txn(write)
}