From 5f9db949564d34fa72476ae339051c5e068ba1cf Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 6 Jul 2020 14:24:30 -0400 Subject: [PATCH] state: Use interface for Txn Also store the index in Changes instead of the Txn. This change is in preparation for movinng EventPublisher to the stream package, and making handleACLUpdates async once again. --- agent/consul/state/acl_events.go | 11 ++-- agent/consul/state/acl_events_test.go | 3 +- agent/consul/state/acl_oss.go | 7 +-- agent/consul/state/db/txn.go | 17 ++++++ agent/consul/state/event_publisher.go | 29 ++++++++--- agent/consul/state/event_publisher_test.go | 12 ++--- agent/consul/state/memdb.go | 60 ++++++++++++---------- agent/consul/state/stream_topics.go | 15 +----- 8 files changed, 94 insertions(+), 60 deletions(-) create mode 100644 agent/consul/state/db/txn.go diff --git a/agent/consul/state/acl_events.go b/agent/consul/state/acl_events.go index 94dbfb5462..f26a05b098 100644 --- a/agent/consul/state/acl_events.go +++ b/agent/consul/state/acl_events.go @@ -1,6 +1,7 @@ package state import ( + "github.com/hashicorp/consul/agent/consul/state/db" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" @@ -9,17 +10,17 @@ import ( // ACLEventsFromChanges returns all the ACL token, policy or role events that // should be emitted given a set of changes to the state store. // TODO: Add OpDelete/OpUpdate to the event or payload? -func aclEventsFromChanges(tx *txn, changes memdb.Changes) ([]stream.Event, error) { +func aclEventsFromChanges(_ db.ReadTxn, changes db.Changes) ([]stream.Event, error) { var events []stream.Event // TODO: mapping of table->topic? - for _, change := range changes { + for _, change := range changes.Changes { switch change.Table { case "acl-tokens": token := changeObject(change).(*structs.ACLToken) e := stream.Event{ Topic: stream.Topic_ACLTokens, - Index: tx.Index, + Index: changes.Index, Payload: token, } events = append(events, e) @@ -27,7 +28,7 @@ func aclEventsFromChanges(tx *txn, changes memdb.Changes) ([]stream.Event, error policy := changeObject(change).(*structs.ACLPolicy) e := stream.Event{ Topic: stream.Topic_ACLPolicies, - Index: tx.Index, + Index: changes.Index, Payload: policy, } events = append(events, e) @@ -35,7 +36,7 @@ func aclEventsFromChanges(tx *txn, changes memdb.Changes) ([]stream.Event, error role := changeObject(change).(*structs.ACLRole) e := stream.Event{ Topic: stream.Topic_ACLRoles, - Index: tx.Index, + Index: changes.Index, Payload: role, } events = append(events, e) diff --git a/agent/consul/state/acl_events_test.go b/agent/consul/state/acl_events_test.go index 52bc5f9e76..0b8acbd4cf 100644 --- a/agent/consul/state/acl_events_test.go +++ b/agent/consul/state/acl_events_test.go @@ -5,6 +5,7 @@ import ( "strings" "testing" + "github.com/hashicorp/consul/agent/consul/state/db" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/require" @@ -145,7 +146,7 @@ func TestACLEventsFromChanges(t *testing.T) { // Note we call the func under test directly rather than publishChanges so // we can test this in isolation. - events, err := aclEventsFromChanges(tx, tx.Changes()) + events, err := aclEventsFromChanges(tx, db.Changes{Index: 100, Changes: tx.Changes()}) require.NoError(t, err) require.Len(t, events, 1) diff --git a/agent/consul/state/acl_oss.go b/agent/consul/state/acl_oss.go index a3e19bc2df..5a1dbe713d 100644 --- a/agent/consul/state/acl_oss.go +++ b/agent/consul/state/acl_oss.go @@ -5,6 +5,7 @@ package state import ( "fmt" + "github.com/hashicorp/consul/agent/consul/state/db" "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" ) @@ -289,11 +290,11 @@ func (s *Store) aclTokenListGlobal(tx *txn, _ *structs.EnterpriseMeta) (memdb.Re return tx.Get("acl-tokens", "local", false) } -func aclTokenListByPolicy(tx ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclTokenListByPolicy(tx db.ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-tokens", "policies", policy) } -func aclTokenListByRole(tx ReadTxn, role string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclTokenListByRole(tx db.ReadTxn, role string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-tokens", "roles", role) } @@ -355,7 +356,7 @@ func (s *Store) aclRoleList(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIte return tx.Get("acl-roles", "id") } -func aclRoleListByPolicy(tx ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclRoleListByPolicy(tx db.ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-roles", "policies", policy) } diff --git a/agent/consul/state/db/txn.go b/agent/consul/state/db/txn.go new file mode 100644 index 0000000000..a568bc43f2 --- /dev/null +++ b/agent/consul/state/db/txn.go @@ -0,0 +1,17 @@ +package db + +import "github.com/hashicorp/go-memdb" + +// ReadTxn is implemented by memdb.Txn to perform read operations. +type ReadTxn interface { + Get(table, index string, args ...interface{}) (memdb.ResultIterator, error) + Abort() +} + +// Changes wraps a memdb.Changes to include the index at which these changes +// were made. +type Changes struct { + // Index is the latest index at the time these changes were committed. + Index uint64 + Changes memdb.Changes +} diff --git a/agent/consul/state/event_publisher.go b/agent/consul/state/event_publisher.go index 6a5751f91c..c6177169c3 100644 --- a/agent/consul/state/event_publisher.go +++ b/agent/consul/state/event_publisher.go @@ -6,12 +6,14 @@ import ( "sync" "time" - "github.com/hashicorp/go-memdb" - + "github.com/hashicorp/consul/agent/consul/state/db" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/go-memdb" ) +// EventPublisher receives changes events from Publish, and sends them to all +// registered subscribers. type EventPublisher struct { // topicBufferSize controls how many trailing events we keep in memory for // each topic to avoid needing to snapshot again for re-connecting clients @@ -47,7 +49,7 @@ type EventPublisher struct { // the Commit call in the FSM hot path. publishCh chan commitUpdate - handlers map[stream.Topic]topicHandler + handlers map[stream.Topic]TopicHandler } type subscriptions struct { @@ -63,16 +65,27 @@ type subscriptions struct { byToken map[string]map[*stream.SubscribeRequest]*stream.Subscription } +// TODO: rename type commitUpdate struct { events []stream.Event } +// TopicHandler provides functions which create stream.Events for a topic. +type TopicHandler struct { + // Snapshot creates the necessary events to reproduce the current state and + // appends them to the EventBuffer. + Snapshot func(*stream.SubscribeRequest, *stream.EventBuffer) (index uint64, err error) + // ProcessChanges accepts a slice of Changes, and builds a slice of events for + // those changes. + ProcessChanges func(db.ReadTxn, db.Changes) ([]stream.Event, error) +} + // NewEventPublisher returns an EventPublisher for publishing change events. // Handlers are used to convert the memDB changes into events. // A goroutine is run in the background to publish events to all subscribes. // Cancelling the context will shutdown the goroutine, to free resources, // and stop all publishing. -func NewEventPublisher(ctx context.Context, handlers map[stream.Topic]topicHandler, snapCacheTTL time.Duration) *EventPublisher { +func NewEventPublisher(ctx context.Context, handlers map[stream.Topic]TopicHandler, snapCacheTTL time.Duration) *EventPublisher { e := &EventPublisher{ snapCacheTTL: snapCacheTTL, topicBuffers: make(map[stream.Topic]*stream.EventBuffer), @@ -89,7 +102,10 @@ func NewEventPublisher(ctx context.Context, handlers map[stream.Topic]topicHandl return e } -func (e *EventPublisher) PublishChanges(tx *txn, changes memdb.Changes) error { +// PublishChanges to all subscribers. tx is a read-only transaction that may be +// used from a goroutine. The caller should never use the tx once it has been +// passed to PublishChanged. +func (e *EventPublisher) PublishChanges(tx db.ReadTxn, changes db.Changes) error { var events []stream.Event for topic, handler := range e.handlers { if handler.ProcessChanges != nil { @@ -101,6 +117,7 @@ func (e *EventPublisher) PublishChanges(tx *txn, changes memdb.Changes) error { } } + // TODO: call tx.Abort when this is done with tx. for _, event := range events { // If the event is an ACL update, treat it as a special case. Currently // ACL update events are only used internally to recognize when a subscriber @@ -168,7 +185,7 @@ func (e *EventPublisher) getTopicBuffer(topic stream.Topic) *stream.EventBuffer } // handleACLUpdate handles an ACL token/policy/role update. -func (s *subscriptions) handleACLUpdate(tx ReadTxn, event stream.Event) error { +func (s *subscriptions) handleACLUpdate(tx db.ReadTxn, event stream.Event) error { s.lock.RLock() defer s.lock.RUnlock() diff --git a/agent/consul/state/event_publisher_test.go b/agent/consul/state/event_publisher_test.go index b58eef6a64..5c41089665 100644 --- a/agent/consul/state/event_publisher_test.go +++ b/agent/consul/state/event_publisher_test.go @@ -6,9 +6,9 @@ import ( "time" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/state/db" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/go-memdb" "github.com/stretchr/testify/require" ) @@ -97,18 +97,18 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) { var topicService stream.Topic = 901 -func newTestTopicHandlers(s *Store) map[stream.Topic]topicHandler { - return map[stream.Topic]topicHandler{ +func newTestTopicHandlers(s *Store) map[stream.Topic]TopicHandler { + return map[stream.Topic]TopicHandler{ topicService: { - ProcessChanges: func(t *txn, changes memdb.Changes) ([]stream.Event, error) { + ProcessChanges: func(tx db.ReadTxn, changes db.Changes) ([]stream.Event, error) { var events []stream.Event - for _, change := range changes { + for _, change := range changes.Changes { if change.Table == "services" { service := change.After.(*structs.ServiceNode) events = append(events, stream.Event{ Topic: topicService, Key: service.ServiceName, - Index: t.Index, + Index: changes.Index, Payload: service, }) } diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 3b3d4875d5..864694b532 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -1,15 +1,10 @@ package state import ( + "github.com/hashicorp/consul/agent/consul/state/db" "github.com/hashicorp/go-memdb" ) -// ReadTxn is implemented by memdb.Txn to perform read operations. -type ReadTxn interface { - Get(table, index string, args ...interface{}) (memdb.ResultIterator, error) - Abort() -} - // changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on // all write transactions. When the transaction is committed the changes are // sent to the eventPublisher which will create and emit change events. @@ -19,7 +14,7 @@ type changeTrackerDB struct { } type changePublisher interface { - PublishChanges(tx *txn, changes memdb.Changes) error + PublishChanges(tx db.ReadTxn, changes db.Changes) error } // Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting @@ -27,17 +22,20 @@ type changePublisher interface { // with write=true. // // Deprecated: use either ReadTxn, or WriteTxn. -func (db *changeTrackerDB) Txn(write bool) *txn { +func (c *changeTrackerDB) Txn(write bool) *txn { if write { panic("don't use db.Txn(true), use db.WriteTxn(idx uin64)") } - return db.ReadTxn() + return c.ReadTxn() } // ReadTxn returns a read-only transaction which behaves exactly the same as // memdb.Txn -func (db *changeTrackerDB) ReadTxn() *txn { - return &txn{Txn: db.db.Txn(false)} +// +// TODO: this could return a regular memdb.Txn if all the state functions accepted +// and interface +func (c *changeTrackerDB) ReadTxn() *txn { + return &txn{Txn: c.db.Txn(false)} } // WriteTxn returns a wrapped memdb.Txn suitable for writes to the state store. @@ -50,11 +48,17 @@ func (db *changeTrackerDB) ReadTxn() *txn { // The exceptional cases are transactions that are executed on an empty // memdb.DB as part of Restore, and those executed by tests where we insert // data directly into the DB. These cases may use WriteTxnRestore. -func (db *changeTrackerDB) WriteTxn(idx uint64) *txn { +func (c *changeTrackerDB) WriteTxn(idx uint64) *txn { t := &txn{ - Txn: db.db.Txn(true), - Index: idx, - publisher: db.publisher, + Txn: c.db.Txn(true), + Index: idx, + publish: func(changes db.Changes) error { + // publish provides a new read-only Txn to PublishChanges so that + // events can be constructed from the current state at the time of + // Commit, and so that operations can be performed in a goroutine + // after this WriteTxn is committed. + return c.publisher.PublishChanges(c.db.Txn(false), changes) + }, } t.Txn.TrackChanges() return t @@ -66,12 +70,11 @@ func (db *changeTrackerDB) WriteTxn(idx uint64) *txn { // WriteTxnRestore uses a zero index since the whole restore doesn't really occur // at one index - the effect is to write many values that were previously // written across many indexes. -func (db *changeTrackerDB) WriteTxnRestore() *txn { - t := &txn{ - Txn: db.db.Txn(true), +func (c *changeTrackerDB) WriteTxnRestore() *txn { + return &txn{ + Txn: c.db.Txn(true), Index: 0, } - return t } // txn wraps a memdb.Txn to capture changes and send them to the EventPublisher. @@ -83,11 +86,11 @@ func (db *changeTrackerDB) WriteTxnRestore() *txn { type txn struct { *memdb.Txn // Index in raft where the write is occurring. The value is zero for a - // read-only transaction, and for a WriteTxnRestore transaction. + // read-only, and WriteTxnRestore transaction. // Index is stored so that it may be passed along to any subscribers as part // of a change event. - Index uint64 - publisher changePublisher + Index uint64 + publish func(changes db.Changes) error } // Commit first pushes changes to EventPublisher, then calls Commit on the @@ -97,10 +100,15 @@ type txn struct { // by the caller. A non-nil error indicates that a commit failed and was not // applied. func (tx *txn) Commit() error { - // publisher may be nil if this is a read-only or WriteTxnRestore transaction. - // In those cases changes should also be empty. - if tx.publisher != nil { - if err := tx.publisher.PublishChanges(tx, tx.Txn.Changes()); err != nil { + // publish may be nil if this is a read-only or WriteTxnRestore transaction. + // In those cases changes should also be empty, and there will be nothing + // to publish. + if tx.publish != nil { + changes := db.Changes{ + Index: tx.Index, + Changes: tx.Txn.Changes(), + } + if err := tx.publish(changes); err != nil { return err } } diff --git a/agent/consul/state/stream_topics.go b/agent/consul/state/stream_topics.go index 23c1468299..0d7f30708f 100644 --- a/agent/consul/state/stream_topics.go +++ b/agent/consul/state/stream_topics.go @@ -2,22 +2,11 @@ package state import ( "github.com/hashicorp/consul/agent/consul/stream" - memdb "github.com/hashicorp/go-memdb" ) -// topicHandler provides functions which create stream.Events for a topic. -type topicHandler struct { - // Snapshot creates the necessary events to reproduce the current state and - // appends them to the EventBuffer. - Snapshot func(*stream.SubscribeRequest, *stream.EventBuffer) (index uint64, err error) - // ProcessChanges accepts a slice of Changes, and builds a slice of events for - // those changes. - ProcessChanges func(*txn, memdb.Changes) ([]stream.Event, error) -} - // newTopicHandlers returns the default handlers for state change events. -func newTopicHandlers() map[stream.Topic]topicHandler { - return map[stream.Topic]topicHandler{ +func newTopicHandlers() map[stream.Topic]TopicHandler { + return map[stream.Topic]TopicHandler{ // For now we don't actually support subscribing to ACL* topics externally // so these have no Snapshot methods yet. We do need to have a // ProcessChanges func to publish the partial events on ACL changes though