2023-03-28 23:48:58 +01:00
|
|
|
// Copyright (c) HashiCorp, Inc.
|
2023-08-11 09:12:13 -04:00
|
|
|
// SPDX-License-Identifier: BUSL-1.1
|
2023-03-28 23:48:58 +01:00
|
|
|
|
Add storage backend interface and in-memory implementation (#16538)
Introduces `storage.Backend`, which will serve as the interface between the
Resource Service and the underlying storage system (Raft today, but in the
future, who knows!).
The primary design goal of this interface is to keep its surface area small,
and push as much functionality as possible into the layers above, so that new
implementations can be added with little effort, and easily proven to be
correct. To that end, we also provide a suite of "conformance" tests that can
be run against a backend implementation to check it behaves correctly.
In this commit, we introduce an initial in-memory storage backend, which is
suitable for tests and when running Consul in development mode. This backend is
a thin wrapper around the `Store` type, which implements a resource database
using go-memdb and our internal pub/sub system. `Store` will also be used to
handle reads in our Raft backend, and in the future, used as a local cache for
external storage systems.
2023-03-27 10:30:53 +01:00
|
|
|
package inmem
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
|
|
|
|
"github.com/hashicorp/consul/acl"
|
|
|
|
"github.com/hashicorp/consul/agent/consul/stream"
|
|
|
|
"github.com/hashicorp/consul/internal/storage"
|
|
|
|
"github.com/hashicorp/consul/proto-public/pbresource"
|
|
|
|
"github.com/hashicorp/consul/proto/private/pbsubscribe"
|
|
|
|
)
|
|
|
|
|
|
|
|
// Watch implements the storage.Watch interface using a stream.Subscription.
|
|
|
|
type Watch struct {
|
|
|
|
sub *stream.Subscription
|
|
|
|
query query
|
|
|
|
|
|
|
|
// events holds excess events when they are bundled in a stream.PayloadEvents,
|
|
|
|
// until Next is called again.
|
|
|
|
events []stream.Event
|
|
|
|
}
|
|
|
|
|
|
|
|
// Next returns the next WatchEvent, blocking until one is available.
|
|
|
|
func (w *Watch) Next(ctx context.Context) (*pbresource.WatchEvent, error) {
|
|
|
|
for {
|
|
|
|
e, err := w.nextEvent(ctx)
|
2023-04-04 17:30:06 +01:00
|
|
|
if err == stream.ErrSubForceClosed {
|
|
|
|
return nil, storage.ErrWatchClosed
|
|
|
|
}
|
Add storage backend interface and in-memory implementation (#16538)
Introduces `storage.Backend`, which will serve as the interface between the
Resource Service and the underlying storage system (Raft today, but in the
future, who knows!).
The primary design goal of this interface is to keep its surface area small,
and push as much functionality as possible into the layers above, so that new
implementations can be added with little effort, and easily proven to be
correct. To that end, we also provide a suite of "conformance" tests that can
be run against a backend implementation to check it behaves correctly.
In this commit, we introduce an initial in-memory storage backend, which is
suitable for tests and when running Consul in development mode. This backend is
a thin wrapper around the `Store` type, which implements a resource database
using go-memdb and our internal pub/sub system. `Store` will also be used to
handle reads in our Raft backend, and in the future, used as a local cache for
external storage systems.
2023-03-27 10:30:53 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
event := e.Payload.(eventPayload).event
|
|
|
|
if w.query.matches(event.Resource) {
|
|
|
|
return event, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *Watch) nextEvent(ctx context.Context) (*stream.Event, error) {
|
|
|
|
if len(w.events) != 0 {
|
|
|
|
event := w.events[0]
|
|
|
|
w.events = w.events[1:]
|
|
|
|
return &event, nil
|
|
|
|
}
|
|
|
|
|
2023-04-26 13:59:58 +01:00
|
|
|
var idx uint64
|
Add storage backend interface and in-memory implementation (#16538)
Introduces `storage.Backend`, which will serve as the interface between the
Resource Service and the underlying storage system (Raft today, but in the
future, who knows!).
The primary design goal of this interface is to keep its surface area small,
and push as much functionality as possible into the layers above, so that new
implementations can be added with little effort, and easily proven to be
correct. To that end, we also provide a suite of "conformance" tests that can
be run against a backend implementation to check it behaves correctly.
In this commit, we introduce an initial in-memory storage backend, which is
suitable for tests and when running Consul in development mode. This backend is
a thin wrapper around the `Store` type, which implements a resource database
using go-memdb and our internal pub/sub system. `Store` will also be used to
handle reads in our Raft backend, and in the future, used as a local cache for
external storage systems.
2023-03-27 10:30:53 +01:00
|
|
|
for {
|
|
|
|
e, err := w.sub.Next(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if e.IsFramingEvent() {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2023-04-26 13:59:58 +01:00
|
|
|
// This works around a *very* rare race-condition in the EventPublisher where
|
|
|
|
// it's possible to see duplicate events when events are published at the same
|
|
|
|
// time as the first subscription is created on a {topic, subject} pair.
|
|
|
|
//
|
|
|
|
// We see this problem when a call to WriteCAS is happening in parallel with
|
|
|
|
// a call to WatchList. It happens because our snapshot handler returns events
|
|
|
|
// that have not yet been published (in the gap between us committing changes
|
|
|
|
// to MemDB and the EventPublisher dispatching events onto its event buffers).
|
|
|
|
//
|
|
|
|
// An intuitive solution to this problem would be to take eventLock in the
|
|
|
|
// snapshot handler to avoid it racing with publishing, but this does not
|
|
|
|
// work because publishing is asynchronous.
|
|
|
|
//
|
|
|
|
// We should fix this problem at the root, but it's complicated, so for now
|
|
|
|
// we'll work around it.
|
|
|
|
if e.Index <= idx {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
idx = e.Index
|
|
|
|
|
Add storage backend interface and in-memory implementation (#16538)
Introduces `storage.Backend`, which will serve as the interface between the
Resource Service and the underlying storage system (Raft today, but in the
future, who knows!).
The primary design goal of this interface is to keep its surface area small,
and push as much functionality as possible into the layers above, so that new
implementations can be added with little effort, and easily proven to be
correct. To that end, we also provide a suite of "conformance" tests that can
be run against a backend implementation to check it behaves correctly.
In this commit, we introduce an initial in-memory storage backend, which is
suitable for tests and when running Consul in development mode. This backend is
a thin wrapper around the `Store` type, which implements a resource database
using go-memdb and our internal pub/sub system. `Store` will also be used to
handle reads in our Raft backend, and in the future, used as a local cache for
external storage systems.
2023-03-27 10:30:53 +01:00
|
|
|
switch t := e.Payload.(type) {
|
|
|
|
case eventPayload:
|
|
|
|
return &e, nil
|
|
|
|
case *stream.PayloadEvents:
|
|
|
|
if len(t.Items) == 0 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
event, rest := t.Items[0], t.Items[1:]
|
|
|
|
w.events = rest
|
|
|
|
return &event, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-03-31 13:24:19 +01:00
|
|
|
// Close the watch and free its associated resources.
|
|
|
|
func (w *Watch) Close() { w.sub.Unsubscribe() }
|
|
|
|
|
Add storage backend interface and in-memory implementation (#16538)
Introduces `storage.Backend`, which will serve as the interface between the
Resource Service and the underlying storage system (Raft today, but in the
future, who knows!).
The primary design goal of this interface is to keep its surface area small,
and push as much functionality as possible into the layers above, so that new
implementations can be added with little effort, and easily proven to be
correct. To that end, we also provide a suite of "conformance" tests that can
be run against a backend implementation to check it behaves correctly.
In this commit, we introduce an initial in-memory storage backend, which is
suitable for tests and when running Consul in development mode. This backend is
a thin wrapper around the `Store` type, which implements a resource database
using go-memdb and our internal pub/sub system. `Store` will also be used to
handle reads in our Raft backend, and in the future, used as a local cache for
external storage systems.
2023-03-27 10:30:53 +01:00
|
|
|
var eventTopic = stream.StringTopic("resources")
|
|
|
|
|
|
|
|
type eventPayload struct {
|
|
|
|
subject stream.Subject
|
|
|
|
event *pbresource.WatchEvent
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p eventPayload) Subject() stream.Subject { return p.subject }
|
|
|
|
|
|
|
|
// These methods are required by the stream.Payload interface, but we don't use them.
|
|
|
|
func (eventPayload) HasReadPermission(acl.Authorizer) bool { return false }
|
|
|
|
func (eventPayload) ToSubscriptionEvent(uint64) *pbsubscribe.Event { return nil }
|
|
|
|
|
|
|
|
type wildcardSubject struct {
|
|
|
|
resourceType storage.UnversionedType
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s wildcardSubject) String() string {
|
|
|
|
return s.resourceType.Group + indexSeparator +
|
|
|
|
s.resourceType.Kind + indexSeparator +
|
|
|
|
storage.Wildcard
|
|
|
|
}
|
|
|
|
|
|
|
|
type tenancySubject struct {
|
|
|
|
resourceType storage.UnversionedType
|
|
|
|
tenancy *pbresource.Tenancy
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s tenancySubject) String() string {
|
|
|
|
return s.resourceType.Group + indexSeparator +
|
|
|
|
s.resourceType.Kind + indexSeparator +
|
|
|
|
s.tenancy.Partition + indexSeparator +
|
|
|
|
s.tenancy.PeerName + indexSeparator +
|
|
|
|
s.tenancy.Namespace
|
|
|
|
}
|
|
|
|
|
|
|
|
// publishEvent sends the event to the relevant Watches.
|
|
|
|
func (s *Store) publishEvent(idx uint64, op pbresource.WatchEvent_Operation, res *pbresource.Resource) {
|
|
|
|
id := res.Id
|
|
|
|
resourceType := storage.UnversionedTypeFrom(id.Type)
|
|
|
|
event := &pbresource.WatchEvent{Operation: op, Resource: res}
|
|
|
|
|
|
|
|
// We publish two copies of the event: one to the tenancy-specific subject and
|
|
|
|
// another to a wildcard subject. Ideally, we'd be able to put the type in the
|
|
|
|
// topic instead and use stream.SubjectWildcard, but this requires knowing all
|
|
|
|
// types up-front (to register the snapshot handlers).
|
|
|
|
s.pub.Publish([]stream.Event{
|
|
|
|
{
|
|
|
|
Topic: eventTopic,
|
|
|
|
Index: idx,
|
|
|
|
Payload: eventPayload{
|
|
|
|
subject: wildcardSubject{resourceType},
|
|
|
|
event: event,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
{
|
|
|
|
Topic: eventTopic,
|
|
|
|
Index: idx,
|
|
|
|
Payload: eventPayload{
|
|
|
|
subject: tenancySubject{
|
|
|
|
resourceType: resourceType,
|
|
|
|
tenancy: id.Tenancy,
|
|
|
|
},
|
|
|
|
event: event,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// watchSnapshot implements a stream.SnapshotFunc to provide upsert events for
|
|
|
|
// the initial state of the world.
|
|
|
|
func (s *Store) watchSnapshot(req stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) {
|
|
|
|
var q query
|
|
|
|
switch t := req.Subject.(type) {
|
|
|
|
case tenancySubject:
|
|
|
|
q.resourceType = t.resourceType
|
|
|
|
q.tenancy = t.tenancy
|
|
|
|
case wildcardSubject:
|
|
|
|
q.resourceType = t.resourceType
|
|
|
|
q.tenancy = &pbresource.Tenancy{
|
|
|
|
Partition: storage.Wildcard,
|
|
|
|
PeerName: storage.Wildcard,
|
|
|
|
Namespace: storage.Wildcard,
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
return 0, fmt.Errorf("unhandled subject type: %T", req.Subject)
|
|
|
|
}
|
|
|
|
|
2023-04-04 17:30:06 +01:00
|
|
|
tx := s.txn(false)
|
Add storage backend interface and in-memory implementation (#16538)
Introduces `storage.Backend`, which will serve as the interface between the
Resource Service and the underlying storage system (Raft today, but in the
future, who knows!).
The primary design goal of this interface is to keep its surface area small,
and push as much functionality as possible into the layers above, so that new
implementations can be added with little effort, and easily proven to be
correct. To that end, we also provide a suite of "conformance" tests that can
be run against a backend implementation to check it behaves correctly.
In this commit, we introduce an initial in-memory storage backend, which is
suitable for tests and when running Consul in development mode. This backend is
a thin wrapper around the `Store` type, which implements a resource database
using go-memdb and our internal pub/sub system. `Store` will also be used to
handle reads in our Raft backend, and in the future, used as a local cache for
external storage systems.
2023-03-27 10:30:53 +01:00
|
|
|
defer tx.Abort()
|
|
|
|
|
|
|
|
idx, err := currentEventIndex(tx)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
|
|
|
|
results, err := listTxn(tx, q)
|
|
|
|
if err != nil {
|
|
|
|
return 0, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
events := make([]stream.Event, len(results))
|
|
|
|
for i, r := range results {
|
|
|
|
events[i] = stream.Event{
|
|
|
|
Topic: eventTopic,
|
|
|
|
Index: idx,
|
|
|
|
Payload: eventPayload{
|
|
|
|
subject: req.Subject,
|
|
|
|
event: &pbresource.WatchEvent{
|
|
|
|
Operation: pbresource.WatchEvent_OPERATION_UPSERT,
|
|
|
|
Resource: r,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
snap.Append(events)
|
|
|
|
|
|
|
|
return idx, nil
|
|
|
|
}
|