diff --git a/agent/consul/state/acl_oss.go b/agent/consul/state/acl_oss.go index 9003e8d902..a3e19bc2df 100644 --- a/agent/consul/state/acl_oss.go +++ b/agent/consul/state/acl_oss.go @@ -289,11 +289,11 @@ func (s *Store) aclTokenListGlobal(tx *txn, _ *structs.EnterpriseMeta) (memdb.Re return tx.Get("acl-tokens", "local", false) } -func aclTokenListByPolicy(tx *txn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclTokenListByPolicy(tx ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-tokens", "policies", policy) } -func aclTokenListByRole(tx *txn, role string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclTokenListByRole(tx ReadTxn, role string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-tokens", "roles", role) } @@ -355,7 +355,7 @@ func (s *Store) aclRoleList(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIte return tx.Get("acl-roles", "id") } -func aclRoleListByPolicy(tx *txn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclRoleListByPolicy(tx ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-roles", "policies", policy) } diff --git a/agent/consul/state/event_publisher.go b/agent/consul/state/event_publisher.go index 654174d7b4..8a5d97349d 100644 --- a/agent/consul/state/event_publisher.go +++ b/agent/consul/state/event_publisher.go @@ -13,8 +13,6 @@ import ( ) type EventPublisher struct { - store *Store - // topicBufferSize controls how many trailing events we keep in memory for // each topic to avoid needing to snapshot again for re-connecting clients // that may have missed some events. It may be zero for no buffering (the most @@ -42,10 +40,6 @@ type EventPublisher struct { // TODO: new struct for snapCache and snapFns and snapCacheTTL snapCache map[stream.Topic]map[string]*stream.EventSnapshot - // snapFns is the set of snapshot functions that were registered bound to the - // state store. - snapFns map[stream.Topic]stream.SnapFn - // subsByToken stores a list of Subscription objects outstanding indexed by a // hash of the ACL token they used to subscribe so we can reload them if their // ACL permissions change. @@ -55,32 +49,22 @@ type EventPublisher struct { // publishes events, so that publishing can happen asynchronously from // the Commit call in the FSM hot path. publishCh chan commitUpdate + + handlers map[stream.Topic]topicHandler } type commitUpdate struct { - tx *txn + tx ReadTxn events []stream.Event } -func NewEventPublisher(store *Store, topicBufferSize int, snapCacheTTL time.Duration) *EventPublisher { +func NewEventPublisher(handlers map[stream.Topic]topicHandler, snapCacheTTL time.Duration) *EventPublisher { e := &EventPublisher{ - store: store, - topicBufferSize: topicBufferSize, - snapCacheTTL: snapCacheTTL, - topicBuffers: make(map[stream.Topic]*stream.EventBuffer), - snapCache: make(map[stream.Topic]map[string]*stream.EventSnapshot), - snapFns: make(map[stream.Topic]stream.SnapFn), - subsByToken: make(map[string]map[*stream.SubscribeRequest]*stream.Subscription), - publishCh: make(chan commitUpdate, 64), - } - - // create a local handler table - // TODO: document why - for topic, handlers := range topicRegistry { - fnCopy := handlers.Snapshot - e.snapFns[topic] = func(req *stream.SubscribeRequest, buf *stream.EventBuffer) (uint64, error) { - return fnCopy(e.store, req, buf) - } + snapCacheTTL: snapCacheTTL, + topicBuffers: make(map[stream.Topic]*stream.EventBuffer), + snapCache: make(map[stream.Topic]map[string]*stream.EventSnapshot), + publishCh: make(chan commitUpdate, 64), + handlers: handlers, } go e.handleUpdates() @@ -88,11 +72,11 @@ func NewEventPublisher(store *Store, topicBufferSize int, snapCacheTTL time.Dura return e } -func (e *EventPublisher) publishChanges(tx *txn, changes memdb.Changes) error { +func (e *EventPublisher) PublishChanges(tx *txn, changes memdb.Changes) error { var events []stream.Event - for topic, th := range topicRegistry { - if th.ProcessChanges != nil { - es, err := th.ProcessChanges(e.store, tx, changes) + for topic, handler := range e.handlers { + if handler.ProcessChanges != nil { + es, err := handler.ProcessChanges(tx, changes) if err != nil { return fmt.Errorf("failed generating events for topic %q: %s", topic, err) } @@ -106,7 +90,7 @@ func (e *EventPublisher) publishChanges(tx *txn, changes memdb.Changes) error { // thread. Transactions aren't thread safe but it's OK to create it here // since we won't try to use it in this thread and pass it straight to the // handler which will own it exclusively. - tx: e.store.db.Txn(false), + tx: e.db.Txn(false), events: events, } return nil @@ -179,7 +163,7 @@ func (e *EventPublisher) getTopicBuffer(topic stream.Topic) *stream.EventBuffer // handleACLUpdate handles an ACL token/policy/role update. This method assumes // the lock is held. -func (e *EventPublisher) handleACLUpdate(tx *txn, event stream.Event) error { +func (e *EventPublisher) handleACLUpdate(tx ReadTxn, event stream.Event) error { switch event.Topic { case stream.Topic_ACLTokens: token := event.Payload.(*structs.ACLToken) @@ -248,7 +232,7 @@ func (e *EventPublisher) Subscribe( req *stream.SubscribeRequest, ) (*stream.Subscription, error) { // Ensure we know how to make a snapshot for this topic - _, ok := topicRegistry[req.Topic] + _, ok := e.handlers[req.Topic] if !ok { return nil, fmt.Errorf("unknown topic %d", req.Topic) } @@ -341,12 +325,12 @@ func (e *EventPublisher) getSnapshotLocked(req *stream.SubscribeRequest, topicHe } // No snap or errored snap in cache, create a new one - snapFn, ok := e.snapFns[req.Topic] + handler, ok := e.handlers[req.Topic] if !ok { return nil, fmt.Errorf("unknown topic %d", req.Topic) } - snap = stream.NewEventSnapshot(req, topicHead, snapFn) + snap = stream.NewEventSnapshot(req, topicHead, handler.Snapshot) if e.snapCacheTTL > 0 { topicSnaps[req.Key] = snap diff --git a/agent/consul/state/event_publisher_test.go b/agent/consul/state/event_publisher_test.go index a0141cc434..b58eef6a64 100644 --- a/agent/consul/state/event_publisher_test.go +++ b/agent/consul/state/event_publisher_test.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/go-memdb" "github.com/stretchr/testify/require" ) @@ -94,21 +95,60 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) { } } +var topicService stream.Topic = 901 + +func newTestTopicHandlers(s *Store) map[stream.Topic]topicHandler { + return map[stream.Topic]topicHandler{ + topicService: { + ProcessChanges: func(t *txn, changes memdb.Changes) ([]stream.Event, error) { + var events []stream.Event + for _, change := range changes { + if change.Table == "services" { + service := change.After.(*structs.ServiceNode) + events = append(events, stream.Event{ + Topic: topicService, + Key: service.ServiceName, + Index: t.Index, + Payload: service, + }) + } + } + return events, nil + }, + Snapshot: func(req *stream.SubscribeRequest, buffer *stream.EventBuffer) (uint64, error) { + idx, nodes, err := s.ServiceNodes(nil, req.Key, nil) + if err != nil { + return idx, err + } + + for _, node := range nodes { + event := stream.Event{ + Topic: req.Topic, + Key: req.Key, + Index: node.ModifyIndex, + Payload: node, + } + buffer.Append([]stream.Event{event}) + } + return idx, nil + }, + }, + stream.Topic_ACLTokens: { + ProcessChanges: aclEventsFromChanges, + }, + } +} + func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken { - // Token to use during this test. token := &structs.ACLToken{ AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4", SecretID: "4268ce0d-d7ae-4718-8613-42eba9036020", Description: "something", Policies: []structs.ACLTokenPolicyLink{ - structs.ACLTokenPolicyLink{ - ID: testPolicyID_A, - }, + {ID: testPolicyID_A}, }, Roles: []structs.ACLTokenRoleLink{ - structs.ACLTokenRoleLink{ - ID: testRoleID_B, - }, + {ID: testRoleID_B}, }, } token.SetHash(false) @@ -123,14 +163,15 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo // so we know the initial token write event has been sent out before // continuing... subscription := &stream.SubscribeRequest{ - Topic: stream.Topic_ServiceHealth, + Topic: topicService, Key: "nope", Token: token.SecretID, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := NewEventPublisher(s.db, 0, 0) + publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0) + s.db.publisher = publisher sub, err := publisher.Subscribe(ctx, subscription) require.NoError(t, err) @@ -145,25 +186,25 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo return token } -func TestEventPublisher_Publish_Success(t *testing.T) { - t.Skip("TODO: replace service registration with test events") +func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) { t.Parallel() require := require.New(t) - s := testStateStore(t) + store, err := NewStateStore(nil) + require.NoError(err) - // Register an initial instance reg := structs.TestRegisterRequest(t) reg.Service.ID = "web1" - require.NoError(s.EnsureRegistration(1, reg)) + require.NoError(store.EnsureRegistration(1, reg)) // Register the subscription. subscription := &stream.SubscribeRequest{ - Topic: stream.Topic_ServiceHealth, + Topic: topicService, Key: reg.Service.Service, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := NewEventPublisher(s.db, 0, 0) + publisher := NewEventPublisher(ctx, newTestTopicHandlers(store), 0) + store.db.publisher = publisher sub, err := publisher.Subscribe(ctx, subscription) require.NoError(err) @@ -171,8 +212,8 @@ func TestEventPublisher_Publish_Success(t *testing.T) { // Stream should get the instance and then EndOfSnapshot e := assertEvent(t, eventCh) - sh := e.Payload // TODO: examine payload, instead of not-nil check - require.NotNil(sh, "expected service health event, got %v", e) + srv := e.Payload.(*structs.ServiceNode) + require.Equal(srv.ServiceID, "web1") e = assertEvent(t, eventCh) require.True(e.IsEndOfSnapshot()) @@ -180,17 +221,16 @@ func TestEventPublisher_Publish_Success(t *testing.T) { assertNoEvent(t, eventCh) // Add a new instance of service on a different node - reg2 := reg - reg2.Node = "node2" - require.NoError(s.EnsureRegistration(1, reg)) + reg.Node = "node2" + require.NoError(store.EnsureRegistration(1, reg)) // Subscriber should see registration e = assertEvent(t, eventCh) - sh = e.Payload // TODO: examine payload, instead of not-nil check - require.NotNil(sh, "expected service health event, got %v", e) + srv = e.Payload.(*structs.ServiceNode) + require.Equal(srv.Node, "node2") } -func TestPublisher_ACLTokenUpdate(t *testing.T) { +func TestEventPublisher_Publish_ACLTokenUpdate(t *testing.T) { t.Parallel() require := require.New(t) s := testACLTokensStateStore(t) @@ -200,14 +240,15 @@ func TestPublisher_ACLTokenUpdate(t *testing.T) { // Register the subscription. subscription := &stream.SubscribeRequest{ - Topic: stream.Topic_ServiceHealth, + Topic: topicService, Key: "nope", Token: token.SecretID, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := NewEventPublisher(s.db, 0, 0) + publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0) + s.db.publisher = publisher sub, err := publisher.Subscribe(ctx, subscription) require.NoError(err) @@ -243,7 +284,7 @@ func TestPublisher_ACLTokenUpdate(t *testing.T) { // Register another subscription. subscription2 := &stream.SubscribeRequest{ - Topic: stream.Topic_ServiceHealth, + Topic: topicService, Key: "nope", Token: token.SecretID, } @@ -270,7 +311,7 @@ func TestPublisher_ACLTokenUpdate(t *testing.T) { require.Equal(stream.ErrSubscriptionReload, err) } -func TestPublisher_ACLPolicyUpdate(t *testing.T) { +func TestEventPublisher_Publish_ACLPolicyUpdate(t *testing.T) { t.Parallel() require := require.New(t) s := testACLTokensStateStore(t) @@ -280,14 +321,15 @@ func TestPublisher_ACLPolicyUpdate(t *testing.T) { // Register the subscription. subscription := &stream.SubscribeRequest{ - Topic: stream.Topic_ServiceHealth, + Topic: topicService, Key: "nope", Token: token.SecretID, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := NewEventPublisher(s.db, 0, 0) + publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0) + s.db.publisher = publisher sub, err := publisher.Subscribe(ctx, subscription) require.NoError(err) @@ -327,7 +369,7 @@ func TestPublisher_ACLPolicyUpdate(t *testing.T) { // Register another subscription. subscription2 := &stream.SubscribeRequest{ - Topic: stream.Topic_ServiceHealth, + Topic: topicService, Key: "nope", Token: token.SecretID, } @@ -355,7 +397,7 @@ func TestPublisher_ACLPolicyUpdate(t *testing.T) { // Register another subscription. subscription3 := &stream.SubscribeRequest{ - Topic: stream.Topic_ServiceHealth, + Topic: topicService, Key: "nope", Token: token.SecretID, } @@ -383,7 +425,7 @@ func TestPublisher_ACLPolicyUpdate(t *testing.T) { assertReset(t, eventCh, true) } -func TestPublisher_ACLRoleUpdate(t *testing.T) { +func TestEventPublisher_Publish_ACLRoleUpdate(t *testing.T) { t.Parallel() require := require.New(t) s := testACLTokensStateStore(t) @@ -393,14 +435,15 @@ func TestPublisher_ACLRoleUpdate(t *testing.T) { // Register the subscription. subscription := &stream.SubscribeRequest{ - Topic: stream.Topic_ServiceHealth, + Topic: topicService, Key: "nope", Token: token.SecretID, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := NewEventPublisher(s.db, 0, 0) + publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0) + s.db.publisher = publisher sub, err := publisher.Subscribe(ctx, subscription) require.NoError(err) @@ -436,7 +479,7 @@ func TestPublisher_ACLRoleUpdate(t *testing.T) { // Register another subscription. subscription2 := &stream.SubscribeRequest{ - Topic: stream.Topic_ServiceHealth, + Topic: topicService, Key: "nope", Token: token.SecretID, } diff --git a/agent/consul/state/memdb_wrapper.go b/agent/consul/state/memdb_wrapper.go index 263c2c9923..3b3d4875d5 100644 --- a/agent/consul/state/memdb_wrapper.go +++ b/agent/consul/state/memdb_wrapper.go @@ -4,12 +4,22 @@ import ( "github.com/hashicorp/go-memdb" ) +// ReadTxn is implemented by memdb.Txn to perform read operations. +type ReadTxn interface { + Get(table, index string, args ...interface{}) (memdb.ResultIterator, error) + Abort() +} + // changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on // all write transactions. When the transaction is committed the changes are // sent to the eventPublisher which will create and emit change events. type changeTrackerDB struct { - db *memdb.MemDB - // TODO(streaming): add publisher + db *memdb.MemDB + publisher changePublisher +} + +type changePublisher interface { + PublishChanges(tx *txn, changes memdb.Changes) error } // Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting @@ -42,8 +52,9 @@ func (db *changeTrackerDB) ReadTxn() *txn { // data directly into the DB. These cases may use WriteTxnRestore. func (db *changeTrackerDB) WriteTxn(idx uint64) *txn { t := &txn{ - Txn: db.db.Txn(true), - Index: idx, + Txn: db.db.Txn(true), + Index: idx, + publisher: db.publisher, } t.Txn.TrackChanges() return t @@ -70,12 +81,13 @@ func (db *changeTrackerDB) WriteTxnRestore() *txn { // error. Any errors from the callback would be lost, which would result in a // missing change event, even though the state store had changed. type txn struct { + *memdb.Txn // Index in raft where the write is occurring. The value is zero for a // read-only transaction, and for a WriteTxnRestore transaction. // Index is stored so that it may be passed along to any subscribers as part // of a change event. - Index uint64 - *memdb.Txn + Index uint64 + publisher changePublisher } // Commit first pushes changes to EventPublisher, then calls Commit on the @@ -85,8 +97,13 @@ type txn struct { // by the caller. A non-nil error indicates that a commit failed and was not // applied. func (tx *txn) Commit() error { - // changes may be empty if this is a read-only or WriteTxnRestore transaction. - // TODO(streaming): publish changes: changes := tx.Txn.Changes() + // publisher may be nil if this is a read-only or WriteTxnRestore transaction. + // In those cases changes should also be empty. + if tx.publisher != nil { + if err := tx.publisher.PublishChanges(tx, tx.Txn.Changes()); err != nil { + return err + } + } tx.Txn.Commit() return nil diff --git a/agent/consul/state/state_store.go b/agent/consul/state/state_store.go index 2c057f4b1a..83412bdb39 100644 --- a/agent/consul/state/state_store.go +++ b/agent/consul/state/state_store.go @@ -3,6 +3,7 @@ package state import ( "errors" "fmt" + "time" "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" @@ -153,15 +154,15 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) { return nil, fmt.Errorf("Failed setting up state store: %s", err) } - // Create and return the state store. s := &Store{ schema: schema, abandonCh: make(chan struct{}), kvsGraveyard: NewGraveyard(gc), lockDelay: NewDelay(), - } - s.db = &changeTrackerDB{ - db: db, + db: &changeTrackerDB{ + db: db, + publisher: NewEventPublisher(newTopicHandlers(), 10*time.Second), + }, } return s, nil } diff --git a/agent/consul/state/stream_topics.go b/agent/consul/state/stream_topics.go index 1e8d8a57bc..23c1468299 100644 --- a/agent/consul/state/stream_topics.go +++ b/agent/consul/state/stream_topics.go @@ -5,40 +5,25 @@ import ( memdb "github.com/hashicorp/go-memdb" ) -// unboundSnapFn is a stream.SnapFn with state store as the first argument. This -// is bound to a concrete state store instance in the EventPublisher on startup. -type unboundSnapFn func(*Store, *stream.SubscribeRequest, *stream.EventBuffer) (uint64, error) -type unboundProcessChangesFn func(*Store, *txn, memdb.Changes) ([]stream.Event, error) - -// topicHandlers describes the methods needed to process a streaming -// subscription for a given topic. -type topicHandlers struct { - Snapshot unboundSnapFn - ProcessChanges unboundProcessChangesFn +// topicHandler provides functions which create stream.Events for a topic. +type topicHandler struct { + // Snapshot creates the necessary events to reproduce the current state and + // appends them to the EventBuffer. + Snapshot func(*stream.SubscribeRequest, *stream.EventBuffer) (index uint64, err error) + // ProcessChanges accepts a slice of Changes, and builds a slice of events for + // those changes. + ProcessChanges func(*txn, memdb.Changes) ([]stream.Event, error) } -// topicRegistry is a map of topic handlers. It must only be written to during -// init(). -var topicRegistry map[stream.Topic]topicHandlers - -func init() { - topicRegistry = map[stream.Topic]topicHandlers{ - stream.Topic_ServiceHealth: topicHandlers{ - Snapshot: (*Store).ServiceHealthSnapshot, - ProcessChanges: (*Store).ServiceHealthEventsFromChanges, - }, - stream.Topic_ServiceHealthConnect: topicHandlers{ - Snapshot: (*Store).ServiceHealthConnectSnapshot, - // Note there is no ProcessChanges since Connect events are published by - // the same event publisher as regular health events to avoid duplicating - // lots of filtering on every commit. - }, +// newTopicHandlers returns the default handlers for state change events. +func newTopicHandlers() map[stream.Topic]topicHandler { + return map[stream.Topic]topicHandler{ // For now we don't actually support subscribing to ACL* topics externally // so these have no Snapshot methods yet. We do need to have a // ProcessChanges func to publish the partial events on ACL changes though // so that we can invalidate other subscriptions if their effective ACL // permissions change. - stream.Topic_ACLTokens: topicHandlers{ + stream.Topic_ACLTokens: { ProcessChanges: aclEventsFromChanges, }, // Note no ACLPolicies/ACLRoles defined yet because we publish all events