state: publish changes from Commit

Make topicRegistry use functions instead of unbound methods
Use a regular memDB in EventPublisher to remove a reference cycle
Removes the need for EventPublisher to use a store
This commit is contained in:
Daniel Nephin 2020-06-17 18:15:45 -04:00
parent f5ecd5de5f
commit 2a8a8f7b8d
6 changed files with 142 additions and 112 deletions

View File

@ -289,11 +289,11 @@ func (s *Store) aclTokenListGlobal(tx *txn, _ *structs.EnterpriseMeta) (memdb.Re
return tx.Get("acl-tokens", "local", false)
}
func aclTokenListByPolicy(tx *txn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
func aclTokenListByPolicy(tx ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-tokens", "policies", policy)
}
func aclTokenListByRole(tx *txn, role string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
func aclTokenListByRole(tx ReadTxn, role string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-tokens", "roles", role)
}
@ -355,7 +355,7 @@ func (s *Store) aclRoleList(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIte
return tx.Get("acl-roles", "id")
}
func aclRoleListByPolicy(tx *txn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
func aclRoleListByPolicy(tx ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-roles", "policies", policy)
}

View File

@ -13,8 +13,6 @@ import (
)
type EventPublisher struct {
store *Store
// topicBufferSize controls how many trailing events we keep in memory for
// each topic to avoid needing to snapshot again for re-connecting clients
// that may have missed some events. It may be zero for no buffering (the most
@ -42,10 +40,6 @@ type EventPublisher struct {
// TODO: new struct for snapCache and snapFns and snapCacheTTL
snapCache map[stream.Topic]map[string]*stream.EventSnapshot
// snapFns is the set of snapshot functions that were registered bound to the
// state store.
snapFns map[stream.Topic]stream.SnapFn
// subsByToken stores a list of Subscription objects outstanding indexed by a
// hash of the ACL token they used to subscribe so we can reload them if their
// ACL permissions change.
@ -55,32 +49,22 @@ type EventPublisher struct {
// publishes events, so that publishing can happen asynchronously from
// the Commit call in the FSM hot path.
publishCh chan commitUpdate
handlers map[stream.Topic]topicHandler
}
type commitUpdate struct {
tx *txn
tx ReadTxn
events []stream.Event
}
func NewEventPublisher(store *Store, topicBufferSize int, snapCacheTTL time.Duration) *EventPublisher {
func NewEventPublisher(handlers map[stream.Topic]topicHandler, snapCacheTTL time.Duration) *EventPublisher {
e := &EventPublisher{
store: store,
topicBufferSize: topicBufferSize,
snapCacheTTL: snapCacheTTL,
topicBuffers: make(map[stream.Topic]*stream.EventBuffer),
snapCache: make(map[stream.Topic]map[string]*stream.EventSnapshot),
snapFns: make(map[stream.Topic]stream.SnapFn),
subsByToken: make(map[string]map[*stream.SubscribeRequest]*stream.Subscription),
publishCh: make(chan commitUpdate, 64),
}
// create a local handler table
// TODO: document why
for topic, handlers := range topicRegistry {
fnCopy := handlers.Snapshot
e.snapFns[topic] = func(req *stream.SubscribeRequest, buf *stream.EventBuffer) (uint64, error) {
return fnCopy(e.store, req, buf)
}
handlers: handlers,
}
go e.handleUpdates()
@ -88,11 +72,11 @@ func NewEventPublisher(store *Store, topicBufferSize int, snapCacheTTL time.Dura
return e
}
func (e *EventPublisher) publishChanges(tx *txn, changes memdb.Changes) error {
func (e *EventPublisher) PublishChanges(tx *txn, changes memdb.Changes) error {
var events []stream.Event
for topic, th := range topicRegistry {
if th.ProcessChanges != nil {
es, err := th.ProcessChanges(e.store, tx, changes)
for topic, handler := range e.handlers {
if handler.ProcessChanges != nil {
es, err := handler.ProcessChanges(tx, changes)
if err != nil {
return fmt.Errorf("failed generating events for topic %q: %s", topic, err)
}
@ -106,7 +90,7 @@ func (e *EventPublisher) publishChanges(tx *txn, changes memdb.Changes) error {
// thread. Transactions aren't thread safe but it's OK to create it here
// since we won't try to use it in this thread and pass it straight to the
// handler which will own it exclusively.
tx: e.store.db.Txn(false),
tx: e.db.Txn(false),
events: events,
}
return nil
@ -179,7 +163,7 @@ func (e *EventPublisher) getTopicBuffer(topic stream.Topic) *stream.EventBuffer
// handleACLUpdate handles an ACL token/policy/role update. This method assumes
// the lock is held.
func (e *EventPublisher) handleACLUpdate(tx *txn, event stream.Event) error {
func (e *EventPublisher) handleACLUpdate(tx ReadTxn, event stream.Event) error {
switch event.Topic {
case stream.Topic_ACLTokens:
token := event.Payload.(*structs.ACLToken)
@ -248,7 +232,7 @@ func (e *EventPublisher) Subscribe(
req *stream.SubscribeRequest,
) (*stream.Subscription, error) {
// Ensure we know how to make a snapshot for this topic
_, ok := topicRegistry[req.Topic]
_, ok := e.handlers[req.Topic]
if !ok {
return nil, fmt.Errorf("unknown topic %d", req.Topic)
}
@ -341,12 +325,12 @@ func (e *EventPublisher) getSnapshotLocked(req *stream.SubscribeRequest, topicHe
}
// No snap or errored snap in cache, create a new one
snapFn, ok := e.snapFns[req.Topic]
handler, ok := e.handlers[req.Topic]
if !ok {
return nil, fmt.Errorf("unknown topic %d", req.Topic)
}
snap = stream.NewEventSnapshot(req, topicHead, snapFn)
snap = stream.NewEventSnapshot(req, topicHead, handler.Snapshot)
if e.snapCacheTTL > 0 {
topicSnaps[req.Key] = snap

View File

@ -8,6 +8,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require"
)
@ -94,21 +95,60 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
}
}
var topicService stream.Topic = 901
func newTestTopicHandlers(s *Store) map[stream.Topic]topicHandler {
return map[stream.Topic]topicHandler{
topicService: {
ProcessChanges: func(t *txn, changes memdb.Changes) ([]stream.Event, error) {
var events []stream.Event
for _, change := range changes {
if change.Table == "services" {
service := change.After.(*structs.ServiceNode)
events = append(events, stream.Event{
Topic: topicService,
Key: service.ServiceName,
Index: t.Index,
Payload: service,
})
}
}
return events, nil
},
Snapshot: func(req *stream.SubscribeRequest, buffer *stream.EventBuffer) (uint64, error) {
idx, nodes, err := s.ServiceNodes(nil, req.Key, nil)
if err != nil {
return idx, err
}
for _, node := range nodes {
event := stream.Event{
Topic: req.Topic,
Key: req.Key,
Index: node.ModifyIndex,
Payload: node,
}
buffer.Append([]stream.Event{event})
}
return idx, nil
},
},
stream.Topic_ACLTokens: {
ProcessChanges: aclEventsFromChanges,
},
}
}
func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken {
// Token to use during this test.
token := &structs.ACLToken{
AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4",
SecretID: "4268ce0d-d7ae-4718-8613-42eba9036020",
Description: "something",
Policies: []structs.ACLTokenPolicyLink{
structs.ACLTokenPolicyLink{
ID: testPolicyID_A,
},
{ID: testPolicyID_A},
},
Roles: []structs.ACLTokenRoleLink{
structs.ACLTokenRoleLink{
ID: testRoleID_B,
},
{ID: testRoleID_B},
},
}
token.SetHash(false)
@ -123,14 +163,15 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo
// so we know the initial token write event has been sent out before
// continuing...
subscription := &stream.SubscribeRequest{
Topic: stream.Topic_ServiceHealth,
Topic: topicService,
Key: "nope",
Token: token.SecretID,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(s.db, 0, 0)
publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(t, err)
@ -145,25 +186,25 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo
return token
}
func TestEventPublisher_Publish_Success(t *testing.T) {
t.Skip("TODO: replace service registration with test events")
func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
t.Parallel()
require := require.New(t)
s := testStateStore(t)
store, err := NewStateStore(nil)
require.NoError(err)
// Register an initial instance
reg := structs.TestRegisterRequest(t)
reg.Service.ID = "web1"
require.NoError(s.EnsureRegistration(1, reg))
require.NoError(store.EnsureRegistration(1, reg))
// Register the subscription.
subscription := &stream.SubscribeRequest{
Topic: stream.Topic_ServiceHealth,
Topic: topicService,
Key: reg.Service.Service,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(s.db, 0, 0)
publisher := NewEventPublisher(ctx, newTestTopicHandlers(store), 0)
store.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(err)
@ -171,8 +212,8 @@ func TestEventPublisher_Publish_Success(t *testing.T) {
// Stream should get the instance and then EndOfSnapshot
e := assertEvent(t, eventCh)
sh := e.Payload // TODO: examine payload, instead of not-nil check
require.NotNil(sh, "expected service health event, got %v", e)
srv := e.Payload.(*structs.ServiceNode)
require.Equal(srv.ServiceID, "web1")
e = assertEvent(t, eventCh)
require.True(e.IsEndOfSnapshot())
@ -180,17 +221,16 @@ func TestEventPublisher_Publish_Success(t *testing.T) {
assertNoEvent(t, eventCh)
// Add a new instance of service on a different node
reg2 := reg
reg2.Node = "node2"
require.NoError(s.EnsureRegistration(1, reg))
reg.Node = "node2"
require.NoError(store.EnsureRegistration(1, reg))
// Subscriber should see registration
e = assertEvent(t, eventCh)
sh = e.Payload // TODO: examine payload, instead of not-nil check
require.NotNil(sh, "expected service health event, got %v", e)
srv = e.Payload.(*structs.ServiceNode)
require.Equal(srv.Node, "node2")
}
func TestPublisher_ACLTokenUpdate(t *testing.T) {
func TestEventPublisher_Publish_ACLTokenUpdate(t *testing.T) {
t.Parallel()
require := require.New(t)
s := testACLTokensStateStore(t)
@ -200,14 +240,15 @@ func TestPublisher_ACLTokenUpdate(t *testing.T) {
// Register the subscription.
subscription := &stream.SubscribeRequest{
Topic: stream.Topic_ServiceHealth,
Topic: topicService,
Key: "nope",
Token: token.SecretID,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(s.db, 0, 0)
publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(err)
@ -243,7 +284,7 @@ func TestPublisher_ACLTokenUpdate(t *testing.T) {
// Register another subscription.
subscription2 := &stream.SubscribeRequest{
Topic: stream.Topic_ServiceHealth,
Topic: topicService,
Key: "nope",
Token: token.SecretID,
}
@ -270,7 +311,7 @@ func TestPublisher_ACLTokenUpdate(t *testing.T) {
require.Equal(stream.ErrSubscriptionReload, err)
}
func TestPublisher_ACLPolicyUpdate(t *testing.T) {
func TestEventPublisher_Publish_ACLPolicyUpdate(t *testing.T) {
t.Parallel()
require := require.New(t)
s := testACLTokensStateStore(t)
@ -280,14 +321,15 @@ func TestPublisher_ACLPolicyUpdate(t *testing.T) {
// Register the subscription.
subscription := &stream.SubscribeRequest{
Topic: stream.Topic_ServiceHealth,
Topic: topicService,
Key: "nope",
Token: token.SecretID,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(s.db, 0, 0)
publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(err)
@ -327,7 +369,7 @@ func TestPublisher_ACLPolicyUpdate(t *testing.T) {
// Register another subscription.
subscription2 := &stream.SubscribeRequest{
Topic: stream.Topic_ServiceHealth,
Topic: topicService,
Key: "nope",
Token: token.SecretID,
}
@ -355,7 +397,7 @@ func TestPublisher_ACLPolicyUpdate(t *testing.T) {
// Register another subscription.
subscription3 := &stream.SubscribeRequest{
Topic: stream.Topic_ServiceHealth,
Topic: topicService,
Key: "nope",
Token: token.SecretID,
}
@ -383,7 +425,7 @@ func TestPublisher_ACLPolicyUpdate(t *testing.T) {
assertReset(t, eventCh, true)
}
func TestPublisher_ACLRoleUpdate(t *testing.T) {
func TestEventPublisher_Publish_ACLRoleUpdate(t *testing.T) {
t.Parallel()
require := require.New(t)
s := testACLTokensStateStore(t)
@ -393,14 +435,15 @@ func TestPublisher_ACLRoleUpdate(t *testing.T) {
// Register the subscription.
subscription := &stream.SubscribeRequest{
Topic: stream.Topic_ServiceHealth,
Topic: topicService,
Key: "nope",
Token: token.SecretID,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(s.db, 0, 0)
publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(err)
@ -436,7 +479,7 @@ func TestPublisher_ACLRoleUpdate(t *testing.T) {
// Register another subscription.
subscription2 := &stream.SubscribeRequest{
Topic: stream.Topic_ServiceHealth,
Topic: topicService,
Key: "nope",
Token: token.SecretID,
}

View File

@ -4,12 +4,22 @@ import (
"github.com/hashicorp/go-memdb"
)
// ReadTxn is implemented by memdb.Txn to perform read operations.
type ReadTxn interface {
Get(table, index string, args ...interface{}) (memdb.ResultIterator, error)
Abort()
}
// changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on
// all write transactions. When the transaction is committed the changes are
// sent to the eventPublisher which will create and emit change events.
type changeTrackerDB struct {
db *memdb.MemDB
// TODO(streaming): add publisher
publisher changePublisher
}
type changePublisher interface {
PublishChanges(tx *txn, changes memdb.Changes) error
}
// Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting
@ -44,6 +54,7 @@ func (db *changeTrackerDB) WriteTxn(idx uint64) *txn {
t := &txn{
Txn: db.db.Txn(true),
Index: idx,
publisher: db.publisher,
}
t.Txn.TrackChanges()
return t
@ -70,12 +81,13 @@ func (db *changeTrackerDB) WriteTxnRestore() *txn {
// error. Any errors from the callback would be lost, which would result in a
// missing change event, even though the state store had changed.
type txn struct {
*memdb.Txn
// Index in raft where the write is occurring. The value is zero for a
// read-only transaction, and for a WriteTxnRestore transaction.
// Index is stored so that it may be passed along to any subscribers as part
// of a change event.
Index uint64
*memdb.Txn
publisher changePublisher
}
// Commit first pushes changes to EventPublisher, then calls Commit on the
@ -85,8 +97,13 @@ type txn struct {
// by the caller. A non-nil error indicates that a commit failed and was not
// applied.
func (tx *txn) Commit() error {
// changes may be empty if this is a read-only or WriteTxnRestore transaction.
// TODO(streaming): publish changes: changes := tx.Txn.Changes()
// publisher may be nil if this is a read-only or WriteTxnRestore transaction.
// In those cases changes should also be empty.
if tx.publisher != nil {
if err := tx.publisher.PublishChanges(tx, tx.Txn.Changes()); err != nil {
return err
}
}
tx.Txn.Commit()
return nil

View File

@ -3,6 +3,7 @@ package state
import (
"errors"
"fmt"
"time"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
@ -153,15 +154,15 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) {
return nil, fmt.Errorf("Failed setting up state store: %s", err)
}
// Create and return the state store.
s := &Store{
schema: schema,
abandonCh: make(chan struct{}),
kvsGraveyard: NewGraveyard(gc),
lockDelay: NewDelay(),
}
s.db = &changeTrackerDB{
db: &changeTrackerDB{
db: db,
publisher: NewEventPublisher(newTopicHandlers(), 10*time.Second),
},
}
return s, nil
}

View File

@ -5,40 +5,25 @@ import (
memdb "github.com/hashicorp/go-memdb"
)
// unboundSnapFn is a stream.SnapFn with state store as the first argument. This
// is bound to a concrete state store instance in the EventPublisher on startup.
type unboundSnapFn func(*Store, *stream.SubscribeRequest, *stream.EventBuffer) (uint64, error)
type unboundProcessChangesFn func(*Store, *txn, memdb.Changes) ([]stream.Event, error)
// topicHandlers describes the methods needed to process a streaming
// subscription for a given topic.
type topicHandlers struct {
Snapshot unboundSnapFn
ProcessChanges unboundProcessChangesFn
// topicHandler provides functions which create stream.Events for a topic.
type topicHandler struct {
// Snapshot creates the necessary events to reproduce the current state and
// appends them to the EventBuffer.
Snapshot func(*stream.SubscribeRequest, *stream.EventBuffer) (index uint64, err error)
// ProcessChanges accepts a slice of Changes, and builds a slice of events for
// those changes.
ProcessChanges func(*txn, memdb.Changes) ([]stream.Event, error)
}
// topicRegistry is a map of topic handlers. It must only be written to during
// init().
var topicRegistry map[stream.Topic]topicHandlers
func init() {
topicRegistry = map[stream.Topic]topicHandlers{
stream.Topic_ServiceHealth: topicHandlers{
Snapshot: (*Store).ServiceHealthSnapshot,
ProcessChanges: (*Store).ServiceHealthEventsFromChanges,
},
stream.Topic_ServiceHealthConnect: topicHandlers{
Snapshot: (*Store).ServiceHealthConnectSnapshot,
// Note there is no ProcessChanges since Connect events are published by
// the same event publisher as regular health events to avoid duplicating
// lots of filtering on every commit.
},
// newTopicHandlers returns the default handlers for state change events.
func newTopicHandlers() map[stream.Topic]topicHandler {
return map[stream.Topic]topicHandler{
// For now we don't actually support subscribing to ACL* topics externally
// so these have no Snapshot methods yet. We do need to have a
// ProcessChanges func to publish the partial events on ACL changes though
// so that we can invalidate other subscriptions if their effective ACL
// permissions change.
stream.Topic_ACLTokens: topicHandlers{
stream.Topic_ACLTokens: {
ProcessChanges: aclEventsFromChanges,
},
// Note no ACLPolicies/ACLRoles defined yet because we publish all events