From f8e2e3c7100c9bf65d8274ce115b11f226385395 Mon Sep 17 00:00:00 2001 From: Dan Upton Date: Tue, 22 Mar 2022 19:13:59 +0000 Subject: [PATCH] streaming: emit events when Connect CA Roots change (#12590) OSS sync of enterprise changes at 614f786d --- agent/consul/state/connect_ca.go | 26 ++++-- agent/consul/state/connect_ca_events.go | 82 +++++++++++++++++ agent/consul/state/connect_ca_events_test.go | 95 ++++++++++++++++++++ agent/consul/state/memdb.go | 2 + agent/consul/stream/event.go | 5 ++ 5 files changed, 201 insertions(+), 9 deletions(-) create mode 100644 agent/consul/state/connect_ca_events.go create mode 100644 agent/consul/state/connect_ca_events_test.go diff --git a/agent/consul/state/connect_ca.go b/agent/consul/state/connect_ca.go index 156c79f463..b070311470 100644 --- a/agent/consul/state/connect_ca.go +++ b/agent/consul/state/connect_ca.go @@ -265,6 +265,15 @@ func (s *Store) CARootSetCAS(idx, cidx uint64, rs []*structs.CARoot) (bool, erro tx := s.db.WriteTxn(idx) defer tx.Abort() + if err := caRootSetCASTxn(tx, idx, cidx, rs); err != nil { + return false, err + } + + err := tx.Commit() + return err == nil, err +} + +func caRootSetCASTxn(tx WriteTxn, idx, cidx uint64, rs []*structs.CARoot) error { // There must be exactly one active CA root. activeCount := 0 for _, r := range rs { @@ -273,24 +282,24 @@ func (s *Store) CARootSetCAS(idx, cidx uint64, rs []*structs.CARoot) (bool, erro } } if activeCount != 1 { - return false, fmt.Errorf("there must be exactly one active CA") + return fmt.Errorf("there must be exactly one active CA") } // Get the current max index if midx := maxIndexTxn(tx, tableConnectCARoots); midx != cidx { - return false, nil + return nil } // Go through and find any existing matching CAs so we can preserve and // update their Create/ModifyIndex values. for _, r := range rs { if r.ID == "" { - return false, ErrMissingCARootID + return ErrMissingCARootID } existing, err := tx.First(tableConnectCARoots, "id", r.ID) if err != nil { - return false, fmt.Errorf("failed CA root lookup: %s", err) + return fmt.Errorf("failed CA root lookup: %s", err) } if existing != nil { @@ -304,23 +313,22 @@ func (s *Store) CARootSetCAS(idx, cidx uint64, rs []*structs.CARoot) (bool, erro // Delete all _, err := tx.DeleteAll(tableConnectCARoots, "id") if err != nil { - return false, err + return err } // Insert all for _, r := range rs { if err := tx.Insert(tableConnectCARoots, r); err != nil { - return false, err + return err } } // Update the index if err := tx.Insert(tableIndex, &IndexEntry{tableConnectCARoots, idx}); err != nil { - return false, fmt.Errorf("failed updating index: %s", err) + return fmt.Errorf("failed updating index: %s", err) } - err = tx.Commit() - return err == nil, err + return nil } // CAProviderState is used to pull the built-in provider states from the snapshot. diff --git a/agent/consul/state/connect_ca_events.go b/agent/consul/state/connect_ca_events.go new file mode 100644 index 0000000000..e73c206b5d --- /dev/null +++ b/agent/consul/state/connect_ca_events.go @@ -0,0 +1,82 @@ +package state + +import ( + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/structs" +) + +// EventTopicCARoots is the streaming topic to which events will be published +// when the list of active CA Roots changes. Each event payload contains the +// full list of roots. +// +// Note: topics are ordinarily defined in subscribe.proto, but this one isn't +// currently available via the Subscribe endpoint. +const EventTopicCARoots stringTopic = "CARoots" + +type stringTopic string + +func (s stringTopic) String() string { return string(s) } + +type EventPayloadCARoots struct { + CARoots structs.CARoots +} + +func (e EventPayloadCARoots) Subject() stream.Subject { return stream.SubjectNone } + +func (e EventPayloadCARoots) HasReadPermission(authz acl.Authorizer) bool { + // TODO(agentless): implement this method once the Authorizer exposes a method + // to check for `service:write` on any service. + panic("EventPayloadCARoots does not implement HasReadPermission") +} + +// caRootsChangeEvents returns an event on EventTopicCARoots whenever the list +// of active CA Roots changes. +func caRootsChangeEvents(tx ReadTxn, changes Changes) ([]stream.Event, error) { + var rootsChanged bool + for _, c := range changes.Changes { + if c.Table == tableConnectCARoots { + rootsChanged = true + break + } + } + if !rootsChanged { + return nil, nil + } + + _, roots, err := caRootsTxn(tx, nil) + if err != nil { + return nil, err + } + + return []stream.Event{ + { + Topic: EventTopicCARoots, + Index: changes.Index, + Payload: EventPayloadCARoots{CARoots: roots}, + }, + }, nil +} + +// caRootsSnapshot returns a stream.SnapshotFunc that provides a snapshot of +// the current active list of CA Roots. +func caRootsSnapshot(db ReadDB) stream.SnapshotFunc { + return func(_ stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { + tx := db.ReadTxn() + defer tx.Abort() + + idx, roots, err := caRootsTxn(tx, nil) + if err != nil { + return 0, err + } + + buf.Append([]stream.Event{ + { + Topic: EventTopicCARoots, + Index: idx, + Payload: EventPayloadCARoots{CARoots: roots}, + }, + }) + return idx, nil + } +} diff --git a/agent/consul/state/connect_ca_events_test.go b/agent/consul/state/connect_ca_events_test.go new file mode 100644 index 0000000000..9e91343674 --- /dev/null +++ b/agent/consul/state/connect_ca_events_test.go @@ -0,0 +1,95 @@ +package state + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/structs" +) + +func TestCARootsEvents(t *testing.T) { + store := testStateStore(t) + rootA := connect.TestCA(t, nil) + + _, err := store.CARootSetCAS(1, 0, structs.CARoots{rootA}) + require.NoError(t, err) + + t.Run("roots changed", func(t *testing.T) { + tx := store.db.WriteTxn(2) + defer tx.Abort() + + rootB := connect.TestCA(t, nil) + err = caRootSetCASTxn(tx, 2, 1, structs.CARoots{rootB}) + require.NoError(t, err) + + events, err := caRootsChangeEvents(tx, Changes{Index: 2, Changes: tx.Changes()}) + require.NoError(t, err) + require.Equal(t, []stream.Event{ + { + Topic: EventTopicCARoots, + Index: 2, + Payload: EventPayloadCARoots{ + CARoots: structs.CARoots{rootB}, + }, + }, + }, events) + }) + + t.Run("no change", func(t *testing.T) { + tx := store.db.ReadTxn() + defer tx.Abort() + + events, err := caRootsChangeEvents(tx, Changes{Index: 2, Changes: tx.Changes()}) + require.NoError(t, err) + require.Empty(t, events) + }) +} + +func TestCARootsSnapshot(t *testing.T) { + store := testStateStore(t) + fn := caRootsSnapshot((*readDB)(store.db.db)) + + var req stream.SubscribeRequest + + t.Run("no roots", func(t *testing.T) { + buf := &snapshotAppender{} + + idx, err := fn(req, buf) + require.NoError(t, err) + require.Equal(t, uint64(0), idx) + + require.Len(t, buf.events, 1) + require.Len(t, buf.events[0], 1) + + payload := buf.events[0][0].Payload.(EventPayloadCARoots) + require.Empty(t, payload.CARoots) + }) + + t.Run("with roots", func(t *testing.T) { + buf := &snapshotAppender{} + + root := connect.TestCA(t, nil) + + _, err := store.CARootSetCAS(1, 0, structs.CARoots{root}) + require.NoError(t, err) + + idx, err := fn(req, buf) + require.NoError(t, err) + require.Equal(t, uint64(1), idx) + + require.Equal(t, buf.events, [][]stream.Event{ + { + { + Topic: EventTopicCARoots, + Index: 1, + Payload: EventPayloadCARoots{ + CARoots: structs.CARoots{root}, + }, + }, + }, + }) + }) +} diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 871e49581f..936375eb4d 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -187,6 +187,7 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { var events []stream.Event fns := []func(tx ReadTxn, changes Changes) ([]stream.Event, error){ aclChangeUnsubscribeEvent, + caRootsChangeEvents, ServiceHealthEventsFromChanges, // TODO: add other table handlers here. } @@ -204,5 +205,6 @@ func newSnapshotHandlers(db ReadDB) stream.SnapshotHandlers { return stream.SnapshotHandlers{ topicServiceHealth: serviceHealthSnapshot(db, topicServiceHealth), topicServiceHealthConnect: serviceHealthSnapshot(db, topicServiceHealthConnect), + EventTopicCARoots: caRootsSnapshot(db), } } diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index 9240f65242..78e41bc375 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -19,6 +19,11 @@ type Topic fmt.Stringer // normalized resource name (including partition and namespace if applicable). type Subject string +// SubjectNone is used when all events on a given topic are "global" and not +// further partitioned by subject. For example: the "CA Roots" topic which is +// used to notify subscribers when the global set CA root certificates changes. +const SubjectNone Subject = "none" + // Event is a structure with identifiers and a payload. Events are Published to // EventPublisher and returned to Subscribers. type Event struct {