diff --git a/agent/consul/state/acl_events.go b/agent/consul/state/acl_events.go index 94dbfb5462..f26a05b098 100644 --- a/agent/consul/state/acl_events.go +++ b/agent/consul/state/acl_events.go @@ -1,6 +1,7 @@ package state import ( + "github.com/hashicorp/consul/agent/consul/state/db" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" @@ -9,17 +10,17 @@ import ( // ACLEventsFromChanges returns all the ACL token, policy or role events that // should be emitted given a set of changes to the state store. // TODO: Add OpDelete/OpUpdate to the event or payload? -func aclEventsFromChanges(tx *txn, changes memdb.Changes) ([]stream.Event, error) { +func aclEventsFromChanges(_ db.ReadTxn, changes db.Changes) ([]stream.Event, error) { var events []stream.Event // TODO: mapping of table->topic? - for _, change := range changes { + for _, change := range changes.Changes { switch change.Table { case "acl-tokens": token := changeObject(change).(*structs.ACLToken) e := stream.Event{ Topic: stream.Topic_ACLTokens, - Index: tx.Index, + Index: changes.Index, Payload: token, } events = append(events, e) @@ -27,7 +28,7 @@ func aclEventsFromChanges(tx *txn, changes memdb.Changes) ([]stream.Event, error policy := changeObject(change).(*structs.ACLPolicy) e := stream.Event{ Topic: stream.Topic_ACLPolicies, - Index: tx.Index, + Index: changes.Index, Payload: policy, } events = append(events, e) @@ -35,7 +36,7 @@ func aclEventsFromChanges(tx *txn, changes memdb.Changes) ([]stream.Event, error role := changeObject(change).(*structs.ACLRole) e := stream.Event{ Topic: stream.Topic_ACLRoles, - Index: tx.Index, + Index: changes.Index, Payload: role, } events = append(events, e) diff --git a/agent/consul/state/acl_events_test.go b/agent/consul/state/acl_events_test.go index 52bc5f9e76..0b8acbd4cf 100644 --- a/agent/consul/state/acl_events_test.go +++ b/agent/consul/state/acl_events_test.go @@ -5,6 +5,7 @@ import ( "strings" "testing" + "github.com/hashicorp/consul/agent/consul/state/db" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/require" @@ -145,7 +146,7 @@ func TestACLEventsFromChanges(t *testing.T) { // Note we call the func under test directly rather than publishChanges so // we can test this in isolation. - events, err := aclEventsFromChanges(tx, tx.Changes()) + events, err := aclEventsFromChanges(tx, db.Changes{Index: 100, Changes: tx.Changes()}) require.NoError(t, err) require.Len(t, events, 1) diff --git a/agent/consul/state/acl_oss.go b/agent/consul/state/acl_oss.go index a3e19bc2df..5a1dbe713d 100644 --- a/agent/consul/state/acl_oss.go +++ b/agent/consul/state/acl_oss.go @@ -5,6 +5,7 @@ package state import ( "fmt" + "github.com/hashicorp/consul/agent/consul/state/db" "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" ) @@ -289,11 +290,11 @@ func (s *Store) aclTokenListGlobal(tx *txn, _ *structs.EnterpriseMeta) (memdb.Re return tx.Get("acl-tokens", "local", false) } -func aclTokenListByPolicy(tx ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclTokenListByPolicy(tx db.ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-tokens", "policies", policy) } -func aclTokenListByRole(tx ReadTxn, role string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclTokenListByRole(tx db.ReadTxn, role string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-tokens", "roles", role) } @@ -355,7 +356,7 @@ func (s *Store) aclRoleList(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIte return tx.Get("acl-roles", "id") } -func aclRoleListByPolicy(tx ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclRoleListByPolicy(tx db.ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-roles", "policies", policy) } diff --git a/agent/consul/state/db/txn.go b/agent/consul/state/db/txn.go new file mode 100644 index 0000000000..a568bc43f2 --- /dev/null +++ b/agent/consul/state/db/txn.go @@ -0,0 +1,17 @@ +package db + +import "github.com/hashicorp/go-memdb" + +// ReadTxn is implemented by memdb.Txn to perform read operations. +type ReadTxn interface { + Get(table, index string, args ...interface{}) (memdb.ResultIterator, error) + Abort() +} + +// Changes wraps a memdb.Changes to include the index at which these changes +// were made. +type Changes struct { + // Index is the latest index at the time these changes were committed. + Index uint64 + Changes memdb.Changes +} diff --git a/agent/consul/state/event_publisher.go b/agent/consul/state/event_publisher.go index 6a5751f91c..c6177169c3 100644 --- a/agent/consul/state/event_publisher.go +++ b/agent/consul/state/event_publisher.go @@ -6,12 +6,14 @@ import ( "sync" "time" - "github.com/hashicorp/go-memdb" - + "github.com/hashicorp/consul/agent/consul/state/db" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/go-memdb" ) +// EventPublisher receives changes events from Publish, and sends them to all +// registered subscribers. type EventPublisher struct { // topicBufferSize controls how many trailing events we keep in memory for // each topic to avoid needing to snapshot again for re-connecting clients @@ -47,7 +49,7 @@ type EventPublisher struct { // the Commit call in the FSM hot path. publishCh chan commitUpdate - handlers map[stream.Topic]topicHandler + handlers map[stream.Topic]TopicHandler } type subscriptions struct { @@ -63,16 +65,27 @@ type subscriptions struct { byToken map[string]map[*stream.SubscribeRequest]*stream.Subscription } +// TODO: rename type commitUpdate struct { events []stream.Event } +// TopicHandler provides functions which create stream.Events for a topic. +type TopicHandler struct { + // Snapshot creates the necessary events to reproduce the current state and + // appends them to the EventBuffer. + Snapshot func(*stream.SubscribeRequest, *stream.EventBuffer) (index uint64, err error) + // ProcessChanges accepts a slice of Changes, and builds a slice of events for + // those changes. + ProcessChanges func(db.ReadTxn, db.Changes) ([]stream.Event, error) +} + // NewEventPublisher returns an EventPublisher for publishing change events. // Handlers are used to convert the memDB changes into events. // A goroutine is run in the background to publish events to all subscribes. // Cancelling the context will shutdown the goroutine, to free resources, // and stop all publishing. -func NewEventPublisher(ctx context.Context, handlers map[stream.Topic]topicHandler, snapCacheTTL time.Duration) *EventPublisher { +func NewEventPublisher(ctx context.Context, handlers map[stream.Topic]TopicHandler, snapCacheTTL time.Duration) *EventPublisher { e := &EventPublisher{ snapCacheTTL: snapCacheTTL, topicBuffers: make(map[stream.Topic]*stream.EventBuffer), @@ -89,7 +102,10 @@ func NewEventPublisher(ctx context.Context, handlers map[stream.Topic]topicHandl return e } -func (e *EventPublisher) PublishChanges(tx *txn, changes memdb.Changes) error { +// PublishChanges to all subscribers. tx is a read-only transaction that may be +// used from a goroutine. The caller should never use the tx once it has been +// passed to PublishChanged. +func (e *EventPublisher) PublishChanges(tx db.ReadTxn, changes db.Changes) error { var events []stream.Event for topic, handler := range e.handlers { if handler.ProcessChanges != nil { @@ -101,6 +117,7 @@ func (e *EventPublisher) PublishChanges(tx *txn, changes memdb.Changes) error { } } + // TODO: call tx.Abort when this is done with tx. for _, event := range events { // If the event is an ACL update, treat it as a special case. Currently // ACL update events are only used internally to recognize when a subscriber @@ -168,7 +185,7 @@ func (e *EventPublisher) getTopicBuffer(topic stream.Topic) *stream.EventBuffer } // handleACLUpdate handles an ACL token/policy/role update. -func (s *subscriptions) handleACLUpdate(tx ReadTxn, event stream.Event) error { +func (s *subscriptions) handleACLUpdate(tx db.ReadTxn, event stream.Event) error { s.lock.RLock() defer s.lock.RUnlock() diff --git a/agent/consul/state/event_publisher_test.go b/agent/consul/state/event_publisher_test.go index b58eef6a64..5c41089665 100644 --- a/agent/consul/state/event_publisher_test.go +++ b/agent/consul/state/event_publisher_test.go @@ -6,9 +6,9 @@ import ( "time" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/state/db" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/go-memdb" "github.com/stretchr/testify/require" ) @@ -97,18 +97,18 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) { var topicService stream.Topic = 901 -func newTestTopicHandlers(s *Store) map[stream.Topic]topicHandler { - return map[stream.Topic]topicHandler{ +func newTestTopicHandlers(s *Store) map[stream.Topic]TopicHandler { + return map[stream.Topic]TopicHandler{ topicService: { - ProcessChanges: func(t *txn, changes memdb.Changes) ([]stream.Event, error) { + ProcessChanges: func(tx db.ReadTxn, changes db.Changes) ([]stream.Event, error) { var events []stream.Event - for _, change := range changes { + for _, change := range changes.Changes { if change.Table == "services" { service := change.After.(*structs.ServiceNode) events = append(events, stream.Event{ Topic: topicService, Key: service.ServiceName, - Index: t.Index, + Index: changes.Index, Payload: service, }) } diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 3b3d4875d5..864694b532 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -1,15 +1,10 @@ package state import ( + "github.com/hashicorp/consul/agent/consul/state/db" "github.com/hashicorp/go-memdb" ) -// ReadTxn is implemented by memdb.Txn to perform read operations. -type ReadTxn interface { - Get(table, index string, args ...interface{}) (memdb.ResultIterator, error) - Abort() -} - // changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on // all write transactions. When the transaction is committed the changes are // sent to the eventPublisher which will create and emit change events. @@ -19,7 +14,7 @@ type changeTrackerDB struct { } type changePublisher interface { - PublishChanges(tx *txn, changes memdb.Changes) error + PublishChanges(tx db.ReadTxn, changes db.Changes) error } // Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting @@ -27,17 +22,20 @@ type changePublisher interface { // with write=true. // // Deprecated: use either ReadTxn, or WriteTxn. -func (db *changeTrackerDB) Txn(write bool) *txn { +func (c *changeTrackerDB) Txn(write bool) *txn { if write { panic("don't use db.Txn(true), use db.WriteTxn(idx uin64)") } - return db.ReadTxn() + return c.ReadTxn() } // ReadTxn returns a read-only transaction which behaves exactly the same as // memdb.Txn -func (db *changeTrackerDB) ReadTxn() *txn { - return &txn{Txn: db.db.Txn(false)} +// +// TODO: this could return a regular memdb.Txn if all the state functions accepted +// and interface +func (c *changeTrackerDB) ReadTxn() *txn { + return &txn{Txn: c.db.Txn(false)} } // WriteTxn returns a wrapped memdb.Txn suitable for writes to the state store. @@ -50,11 +48,17 @@ func (db *changeTrackerDB) ReadTxn() *txn { // The exceptional cases are transactions that are executed on an empty // memdb.DB as part of Restore, and those executed by tests where we insert // data directly into the DB. These cases may use WriteTxnRestore. -func (db *changeTrackerDB) WriteTxn(idx uint64) *txn { +func (c *changeTrackerDB) WriteTxn(idx uint64) *txn { t := &txn{ - Txn: db.db.Txn(true), - Index: idx, - publisher: db.publisher, + Txn: c.db.Txn(true), + Index: idx, + publish: func(changes db.Changes) error { + // publish provides a new read-only Txn to PublishChanges so that + // events can be constructed from the current state at the time of + // Commit, and so that operations can be performed in a goroutine + // after this WriteTxn is committed. + return c.publisher.PublishChanges(c.db.Txn(false), changes) + }, } t.Txn.TrackChanges() return t @@ -66,12 +70,11 @@ func (db *changeTrackerDB) WriteTxn(idx uint64) *txn { // WriteTxnRestore uses a zero index since the whole restore doesn't really occur // at one index - the effect is to write many values that were previously // written across many indexes. -func (db *changeTrackerDB) WriteTxnRestore() *txn { - t := &txn{ - Txn: db.db.Txn(true), +func (c *changeTrackerDB) WriteTxnRestore() *txn { + return &txn{ + Txn: c.db.Txn(true), Index: 0, } - return t } // txn wraps a memdb.Txn to capture changes and send them to the EventPublisher. @@ -83,11 +86,11 @@ func (db *changeTrackerDB) WriteTxnRestore() *txn { type txn struct { *memdb.Txn // Index in raft where the write is occurring. The value is zero for a - // read-only transaction, and for a WriteTxnRestore transaction. + // read-only, and WriteTxnRestore transaction. // Index is stored so that it may be passed along to any subscribers as part // of a change event. - Index uint64 - publisher changePublisher + Index uint64 + publish func(changes db.Changes) error } // Commit first pushes changes to EventPublisher, then calls Commit on the @@ -97,10 +100,15 @@ type txn struct { // by the caller. A non-nil error indicates that a commit failed and was not // applied. func (tx *txn) Commit() error { - // publisher may be nil if this is a read-only or WriteTxnRestore transaction. - // In those cases changes should also be empty. - if tx.publisher != nil { - if err := tx.publisher.PublishChanges(tx, tx.Txn.Changes()); err != nil { + // publish may be nil if this is a read-only or WriteTxnRestore transaction. + // In those cases changes should also be empty, and there will be nothing + // to publish. + if tx.publish != nil { + changes := db.Changes{ + Index: tx.Index, + Changes: tx.Txn.Changes(), + } + if err := tx.publish(changes); err != nil { return err } } diff --git a/agent/consul/state/stream_topics.go b/agent/consul/state/stream_topics.go index 23c1468299..0d7f30708f 100644 --- a/agent/consul/state/stream_topics.go +++ b/agent/consul/state/stream_topics.go @@ -2,22 +2,11 @@ package state import ( "github.com/hashicorp/consul/agent/consul/stream" - memdb "github.com/hashicorp/go-memdb" ) -// topicHandler provides functions which create stream.Events for a topic. -type topicHandler struct { - // Snapshot creates the necessary events to reproduce the current state and - // appends them to the EventBuffer. - Snapshot func(*stream.SubscribeRequest, *stream.EventBuffer) (index uint64, err error) - // ProcessChanges accepts a slice of Changes, and builds a slice of events for - // those changes. - ProcessChanges func(*txn, memdb.Changes) ([]stream.Event, error) -} - // newTopicHandlers returns the default handlers for state change events. -func newTopicHandlers() map[stream.Topic]topicHandler { - return map[stream.Topic]topicHandler{ +func newTopicHandlers() map[stream.Topic]TopicHandler { + return map[stream.Topic]TopicHandler{ // For now we don't actually support subscribing to ACL* topics externally // so these have no Snapshot methods yet. We do need to have a // ProcessChanges func to publish the partial events on ACL changes though