streaming: emit events when Connect CA Roots change (#12590)

OSS sync of enterprise changes at 614f786d
This commit is contained in:
Dan Upton 2022-03-22 19:13:59 +00:00 committed by GitHub
parent a7e5ee005a
commit f8e2e3c710
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 201 additions and 9 deletions

View File

@ -265,6 +265,15 @@ func (s *Store) CARootSetCAS(idx, cidx uint64, rs []*structs.CARoot) (bool, erro
tx := s.db.WriteTxn(idx) tx := s.db.WriteTxn(idx)
defer tx.Abort() defer tx.Abort()
if err := caRootSetCASTxn(tx, idx, cidx, rs); err != nil {
return false, err
}
err := tx.Commit()
return err == nil, err
}
func caRootSetCASTxn(tx WriteTxn, idx, cidx uint64, rs []*structs.CARoot) error {
// There must be exactly one active CA root. // There must be exactly one active CA root.
activeCount := 0 activeCount := 0
for _, r := range rs { for _, r := range rs {
@ -273,24 +282,24 @@ func (s *Store) CARootSetCAS(idx, cidx uint64, rs []*structs.CARoot) (bool, erro
} }
} }
if activeCount != 1 { if activeCount != 1 {
return false, fmt.Errorf("there must be exactly one active CA") return fmt.Errorf("there must be exactly one active CA")
} }
// Get the current max index // Get the current max index
if midx := maxIndexTxn(tx, tableConnectCARoots); midx != cidx { if midx := maxIndexTxn(tx, tableConnectCARoots); midx != cidx {
return false, nil return nil
} }
// Go through and find any existing matching CAs so we can preserve and // Go through and find any existing matching CAs so we can preserve and
// update their Create/ModifyIndex values. // update their Create/ModifyIndex values.
for _, r := range rs { for _, r := range rs {
if r.ID == "" { if r.ID == "" {
return false, ErrMissingCARootID return ErrMissingCARootID
} }
existing, err := tx.First(tableConnectCARoots, "id", r.ID) existing, err := tx.First(tableConnectCARoots, "id", r.ID)
if err != nil { if err != nil {
return false, fmt.Errorf("failed CA root lookup: %s", err) return fmt.Errorf("failed CA root lookup: %s", err)
} }
if existing != nil { if existing != nil {
@ -304,23 +313,22 @@ func (s *Store) CARootSetCAS(idx, cidx uint64, rs []*structs.CARoot) (bool, erro
// Delete all // Delete all
_, err := tx.DeleteAll(tableConnectCARoots, "id") _, err := tx.DeleteAll(tableConnectCARoots, "id")
if err != nil { if err != nil {
return false, err return err
} }
// Insert all // Insert all
for _, r := range rs { for _, r := range rs {
if err := tx.Insert(tableConnectCARoots, r); err != nil { if err := tx.Insert(tableConnectCARoots, r); err != nil {
return false, err return err
} }
} }
// Update the index // Update the index
if err := tx.Insert(tableIndex, &IndexEntry{tableConnectCARoots, idx}); err != nil { if err := tx.Insert(tableIndex, &IndexEntry{tableConnectCARoots, idx}); err != nil {
return false, fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
err = tx.Commit() return nil
return err == nil, err
} }
// CAProviderState is used to pull the built-in provider states from the snapshot. // CAProviderState is used to pull the built-in provider states from the snapshot.

View File

@ -0,0 +1,82 @@
package state
import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
)
// EventTopicCARoots is the streaming topic to which events will be published
// when the list of active CA Roots changes. Each event payload contains the
// full list of roots.
//
// Note: topics are ordinarily defined in subscribe.proto, but this one isn't
// currently available via the Subscribe endpoint.
const EventTopicCARoots stringTopic = "CARoots"
type stringTopic string
func (s stringTopic) String() string { return string(s) }
type EventPayloadCARoots struct {
CARoots structs.CARoots
}
func (e EventPayloadCARoots) Subject() stream.Subject { return stream.SubjectNone }
func (e EventPayloadCARoots) HasReadPermission(authz acl.Authorizer) bool {
// TODO(agentless): implement this method once the Authorizer exposes a method
// to check for `service:write` on any service.
panic("EventPayloadCARoots does not implement HasReadPermission")
}
// caRootsChangeEvents returns an event on EventTopicCARoots whenever the list
// of active CA Roots changes.
func caRootsChangeEvents(tx ReadTxn, changes Changes) ([]stream.Event, error) {
var rootsChanged bool
for _, c := range changes.Changes {
if c.Table == tableConnectCARoots {
rootsChanged = true
break
}
}
if !rootsChanged {
return nil, nil
}
_, roots, err := caRootsTxn(tx, nil)
if err != nil {
return nil, err
}
return []stream.Event{
{
Topic: EventTopicCARoots,
Index: changes.Index,
Payload: EventPayloadCARoots{CARoots: roots},
},
}, nil
}
// caRootsSnapshot returns a stream.SnapshotFunc that provides a snapshot of
// the current active list of CA Roots.
func caRootsSnapshot(db ReadDB) stream.SnapshotFunc {
return func(_ stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
tx := db.ReadTxn()
defer tx.Abort()
idx, roots, err := caRootsTxn(tx, nil)
if err != nil {
return 0, err
}
buf.Append([]stream.Event{
{
Topic: EventTopicCARoots,
Index: idx,
Payload: EventPayloadCARoots{CARoots: roots},
},
})
return idx, nil
}
}

View File

@ -0,0 +1,95 @@
package state
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
)
func TestCARootsEvents(t *testing.T) {
store := testStateStore(t)
rootA := connect.TestCA(t, nil)
_, err := store.CARootSetCAS(1, 0, structs.CARoots{rootA})
require.NoError(t, err)
t.Run("roots changed", func(t *testing.T) {
tx := store.db.WriteTxn(2)
defer tx.Abort()
rootB := connect.TestCA(t, nil)
err = caRootSetCASTxn(tx, 2, 1, structs.CARoots{rootB})
require.NoError(t, err)
events, err := caRootsChangeEvents(tx, Changes{Index: 2, Changes: tx.Changes()})
require.NoError(t, err)
require.Equal(t, []stream.Event{
{
Topic: EventTopicCARoots,
Index: 2,
Payload: EventPayloadCARoots{
CARoots: structs.CARoots{rootB},
},
},
}, events)
})
t.Run("no change", func(t *testing.T) {
tx := store.db.ReadTxn()
defer tx.Abort()
events, err := caRootsChangeEvents(tx, Changes{Index: 2, Changes: tx.Changes()})
require.NoError(t, err)
require.Empty(t, events)
})
}
func TestCARootsSnapshot(t *testing.T) {
store := testStateStore(t)
fn := caRootsSnapshot((*readDB)(store.db.db))
var req stream.SubscribeRequest
t.Run("no roots", func(t *testing.T) {
buf := &snapshotAppender{}
idx, err := fn(req, buf)
require.NoError(t, err)
require.Equal(t, uint64(0), idx)
require.Len(t, buf.events, 1)
require.Len(t, buf.events[0], 1)
payload := buf.events[0][0].Payload.(EventPayloadCARoots)
require.Empty(t, payload.CARoots)
})
t.Run("with roots", func(t *testing.T) {
buf := &snapshotAppender{}
root := connect.TestCA(t, nil)
_, err := store.CARootSetCAS(1, 0, structs.CARoots{root})
require.NoError(t, err)
idx, err := fn(req, buf)
require.NoError(t, err)
require.Equal(t, uint64(1), idx)
require.Equal(t, buf.events, [][]stream.Event{
{
{
Topic: EventTopicCARoots,
Index: 1,
Payload: EventPayloadCARoots{
CARoots: structs.CARoots{root},
},
},
},
})
})
}

View File

@ -187,6 +187,7 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
var events []stream.Event var events []stream.Event
fns := []func(tx ReadTxn, changes Changes) ([]stream.Event, error){ fns := []func(tx ReadTxn, changes Changes) ([]stream.Event, error){
aclChangeUnsubscribeEvent, aclChangeUnsubscribeEvent,
caRootsChangeEvents,
ServiceHealthEventsFromChanges, ServiceHealthEventsFromChanges,
// TODO: add other table handlers here. // TODO: add other table handlers here.
} }
@ -204,5 +205,6 @@ func newSnapshotHandlers(db ReadDB) stream.SnapshotHandlers {
return stream.SnapshotHandlers{ return stream.SnapshotHandlers{
topicServiceHealth: serviceHealthSnapshot(db, topicServiceHealth), topicServiceHealth: serviceHealthSnapshot(db, topicServiceHealth),
topicServiceHealthConnect: serviceHealthSnapshot(db, topicServiceHealthConnect), topicServiceHealthConnect: serviceHealthSnapshot(db, topicServiceHealthConnect),
EventTopicCARoots: caRootsSnapshot(db),
} }
} }

View File

@ -19,6 +19,11 @@ type Topic fmt.Stringer
// normalized resource name (including partition and namespace if applicable). // normalized resource name (including partition and namespace if applicable).
type Subject string type Subject string
// SubjectNone is used when all events on a given topic are "global" and not
// further partitioned by subject. For example: the "CA Roots" topic which is
// used to notify subscribers when the global set CA root certificates changes.
const SubjectNone Subject = "none"
// Event is a structure with identifiers and a payload. Events are Published to // Event is a structure with identifiers and a payload. Events are Published to
// EventPublisher and returned to Subscribers. // EventPublisher and returned to Subscribers.
type Event struct { type Event struct {