Merge pull request #8160 from hashicorp/streaming/add-event-publisher

streaming: add EventPublisher and stream package
This commit is contained in:
Daniel Nephin 2020-07-15 12:46:26 -04:00 committed by GitHub
commit 41b0bf9571
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 2159 additions and 28 deletions

View File

@ -968,6 +968,8 @@ func (s *Server) Shutdown() error {
s.config.NotifyShutdown()
}
s.fsm.State().Abandon()
return nil
}

View File

@ -869,11 +869,11 @@ func (s *Store) ACLTokenList(ws memdb.WatchSet, local, global bool, policy, role
}
} else if policy != "" && role == "" && methodName == "" {
iter, err = s.aclTokenListByPolicy(tx, policy, entMeta)
iter, err = aclTokenListByPolicy(tx, policy, entMeta)
needLocalityFilter = true
} else if policy == "" && role != "" && methodName == "" {
iter, err = s.aclTokenListByRole(tx, role, entMeta)
iter, err = aclTokenListByRole(tx, role, entMeta)
needLocalityFilter = true
} else if policy == "" && role == "" && methodName != "" {
@ -1464,7 +1464,7 @@ func (s *Store) ACLRoleList(ws memdb.WatchSet, policy string, entMeta *structs.E
var err error
if policy != "" {
iter, err = s.aclRoleListByPolicy(tx, policy, entMeta)
iter, err = aclRoleListByPolicy(tx, policy, entMeta)
} else {
iter, err = s.aclRoleList(tx, entMeta)
}

View File

@ -0,0 +1,73 @@
package state
import (
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
)
// aclChangeUnsubscribeEvent creates and returns stream.UnsubscribeEvents that
// are used to unsubscribe any subscriptions which match the tokens from the events.
//
// These are special events that will never be returned to a subscriber.
func aclChangeUnsubscribeEvent(tx ReadTxn, changes Changes) ([]stream.Event, error) {
var secretIDs []string
for _, change := range changes.Changes {
switch change.Table {
case "acl-tokens":
token := changeObject(change).(*structs.ACLToken)
secretIDs = append(secretIDs, token.SecretID)
case "acl-roles":
role := changeObject(change).(*structs.ACLRole)
tokens, err := aclTokenListByRole(tx, role.ID, &role.EnterpriseMeta)
if err != nil {
return nil, err
}
secretIDs = appendSecretIDsFromTokenIterator(secretIDs, tokens)
case "acl-policies":
policy := changeObject(change).(*structs.ACLPolicy)
tokens, err := aclTokenListByPolicy(tx, policy.ID, &policy.EnterpriseMeta)
if err != nil {
return nil, err
}
secretIDs = appendSecretIDsFromTokenIterator(secretIDs, tokens)
roles, err := aclRoleListByPolicy(tx, policy.ID, &policy.EnterpriseMeta)
if err != nil {
return nil, err
}
for role := roles.Next(); role != nil; role = roles.Next() {
role := role.(*structs.ACLRole)
tokens, err := aclTokenListByRole(tx, role.ID, &policy.EnterpriseMeta)
if err != nil {
return nil, err
}
secretIDs = appendSecretIDsFromTokenIterator(secretIDs, tokens)
}
}
}
// There may be duplicate secretIDs here. We rely on this event allowing
// for duplicate IDs.
return []stream.Event{stream.NewCloseSubscriptionEvent(secretIDs)}, nil
}
// changeObject returns the object before it was deleted if the change was a delete,
// otherwise returns the object after the change.
func changeObject(change memdb.Change) interface{} {
if change.Deleted() {
return change.Before
}
return change.After
}
func appendSecretIDsFromTokenIterator(seq []string, tokens memdb.ResultIterator) []string {
for token := tokens.Next(); token != nil; token = tokens.Next() {
token := token.(*structs.ACLToken)
seq = append(seq, token.SecretID)
}
return seq
}

View File

@ -0,0 +1,203 @@
package state
import (
"strconv"
"strings"
"testing"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/require"
)
func TestACLChangeUnsubscribeEvent(t *testing.T) {
cases := []struct {
Name string
Setup func(s *Store, tx *txn) error
Mutate func(s *Store, tx *txn) error
expected stream.Event
}{
{
Name: "token create",
Mutate: func(s *Store, tx *txn) error {
return s.aclTokenSetTxn(tx, tx.Index, newACLToken(1), false, false, false, false)
},
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)),
},
{
Name: "token update",
Setup: func(s *Store, tx *txn) error {
return s.aclTokenSetTxn(tx, tx.Index, newACLToken(1), false, false, false, false)
},
Mutate: func(s *Store, tx *txn) error {
// Add a policy to the token (never mind it doesn't exist for now) we
// allow it in the set command below.
token := newACLToken(1)
token.Policies = []structs.ACLTokenPolicyLink{{ID: "33333333-1111-1111-1111-111111111111"}}
return s.aclTokenSetTxn(tx, tx.Index, token, false, true, false, false)
},
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)),
},
{
Name: "token delete",
Setup: func(s *Store, tx *txn) error {
return s.aclTokenSetTxn(tx, tx.Index, newACLToken(1), false, false, false, false)
},
Mutate: func(s *Store, tx *txn) error {
token := newACLToken(1)
return s.aclTokenDeleteTxn(tx, tx.Index, token.AccessorID, "id", nil)
},
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)),
},
{
Name: "policy create",
Mutate: newACLPolicyWithSingleToken,
// two identical tokens, because Mutate has two changes
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1, 1)),
},
{
Name: "policy update",
Setup: newACLPolicyWithSingleToken,
Mutate: func(s *Store, tx *txn) error {
policy := newACLPolicy(1)
policy.Rules = `operator = "write"`
return s.aclPolicySetTxn(tx, tx.Index, policy)
},
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)),
},
{
Name: "policy delete",
Setup: newACLPolicyWithSingleToken,
Mutate: func(s *Store, tx *txn) error {
policy := newACLPolicy(1)
return s.aclPolicyDeleteTxn(tx, tx.Index, policy.ID, s.aclPolicyGetByID, nil)
},
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)),
},
{
Name: "role create",
Mutate: newACLRoleWithSingleToken,
// Two tokens with the same ID, because there are two changes in Mutate
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1, 1)),
},
{
Name: "role update",
Setup: newACLRoleWithSingleToken,
Mutate: func(s *Store, tx *txn) error {
role := newACLRole(1, newACLRolePolicyLink(1))
policy2 := newACLPolicy(2)
role.Policies = append(role.Policies, structs.ACLRolePolicyLink{
ID: policy2.ID,
Name: policy2.Name,
})
return s.aclRoleSetTxn(tx, tx.Index, role, true)
},
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)),
},
{
Name: "role delete",
Setup: newACLRoleWithSingleToken,
Mutate: func(s *Store, tx *txn) error {
role := newACLRole(1, newACLRolePolicyLink(1))
return s.aclRoleDeleteTxn(tx, tx.Index, role.ID, s.aclRoleGetByID, nil)
},
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)),
},
}
for _, tc := range cases {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
s := testStateStore(t)
if tc.Setup != nil {
// Bypass the publish mechanism for this test or we get into odd
// recursive stuff...
setupTx := s.db.WriteTxn(10)
require.NoError(t, tc.Setup(s, setupTx))
// Commit the underlying transaction without using wrapped Commit so we
// avoid the whole event publishing system for setup here. It _should_
// work but it makes debugging test hard as it will call the function
// under test for the setup data...
setupTx.Txn.Commit()
}
tx := s.db.WriteTxn(100)
require.NoError(t, tc.Mutate(s, tx))
// Note we call the func under test directly rather than publishChanges so
// we can test this in isolation.
events, err := aclChangeUnsubscribeEvent(tx, Changes{Index: 100, Changes: tx.Changes()})
require.NoError(t, err)
require.Len(t, events, 1)
actual := events[0]
require.Equal(t, tc.expected, actual)
})
}
}
func newACLRoleWithSingleToken(s *Store, tx *txn) error {
role := newACLRole(1, newACLRolePolicyLink(1))
if err := s.aclRoleSetTxn(tx, tx.Index, role, true); err != nil {
return err
}
token := newACLToken(1)
token.Roles = append(token.Roles, structs.ACLTokenRoleLink{ID: role.ID})
return s.aclTokenSetTxn(tx, tx.Index, token, false, false, false, false)
}
func newACLPolicyWithSingleToken(s *Store, tx *txn) error {
policy := newACLPolicy(1)
if err := s.aclPolicySetTxn(tx, tx.Index, policy); err != nil {
return err
}
token := newACLToken(1)
token.Policies = append(token.Policies, structs.ACLTokenPolicyLink{ID: policy.ID})
return s.aclTokenSetTxn(tx, tx.Index, token, false, false, false, false)
}
func newSecretIDs(ids ...int) []string {
result := make([]string, 0, len(ids))
for _, id := range ids {
uuid := strings.ReplaceAll("11111111-????-????-????-????????????", "?", strconv.Itoa(id))
result = append(result, uuid)
}
return result
}
func newACLToken(n int) *structs.ACLToken {
uuid := strings.ReplaceAll("11111111-????-????-????-????????????", "?", strconv.Itoa(n))
return &structs.ACLToken{
AccessorID: uuid,
SecretID: uuid,
}
}
func newACLPolicy(n int) *structs.ACLPolicy {
numStr := strconv.Itoa(n)
uuid := strings.ReplaceAll("22222222-????-????-????-????????????", "?", numStr)
return &structs.ACLPolicy{
ID: uuid,
Name: "test_policy_" + numStr,
Rules: `operator = "read"`,
}
}
func newACLRole(n int, policies ...structs.ACLRolePolicyLink) *structs.ACLRole {
numStr := strconv.Itoa(n)
uuid := strings.ReplaceAll("33333333-????-????-????-????????????", "?", numStr)
return &structs.ACLRole{
ID: uuid,
Name: "test_role_" + numStr,
Policies: policies,
}
}
func newACLRolePolicyLink(n int) structs.ACLRolePolicyLink {
policy := newACLPolicy(n)
return structs.ACLRolePolicyLink{
ID: policy.ID,
Name: policy.Name,
}
}

View File

@ -289,11 +289,11 @@ func (s *Store) aclTokenListGlobal(tx *txn, _ *structs.EnterpriseMeta) (memdb.Re
return tx.Get("acl-tokens", "local", false)
}
func (s *Store) aclTokenListByPolicy(tx *txn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
func aclTokenListByPolicy(tx ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-tokens", "policies", policy)
}
func (s *Store) aclTokenListByRole(tx *txn, role string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
func aclTokenListByRole(tx ReadTxn, role string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-tokens", "roles", role)
}
@ -355,7 +355,7 @@ func (s *Store) aclRoleList(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIte
return tx.Get("acl-roles", "id")
}
func (s *Store) aclRoleListByPolicy(tx *txn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
func aclRoleListByPolicy(tx ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-roles", "policies", policy)
}

View File

@ -1,15 +1,37 @@
package state
import (
"fmt"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/go-memdb"
)
// ReadTxn is implemented by memdb.Txn to perform read operations.
type ReadTxn interface {
Get(table, index string, args ...interface{}) (memdb.ResultIterator, error)
Abort()
}
// Changes wraps a memdb.Changes to include the index at which these changes
// were made.
type Changes struct {
// Index is the latest index at the time these changes were committed.
Index uint64
Changes memdb.Changes
}
// changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on
// all write transactions. When the transaction is committed the changes are
// sent to the eventPublisher which will create and emit change events.
type changeTrackerDB struct {
db *memdb.MemDB
// TODO(streaming): add publisher
db *memdb.MemDB
publisher eventPublisher
processChanges func(ReadTxn, Changes) ([]stream.Event, error)
}
type eventPublisher interface {
Publish(events []stream.Event)
}
// Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting
@ -17,17 +39,20 @@ type changeTrackerDB struct {
// with write=true.
//
// Deprecated: use either ReadTxn, or WriteTxn.
func (db *changeTrackerDB) Txn(write bool) *txn {
func (c *changeTrackerDB) Txn(write bool) *txn {
if write {
panic("don't use db.Txn(true), use db.WriteTxn(idx uin64)")
}
return db.ReadTxn()
return c.ReadTxn()
}
// ReadTxn returns a read-only transaction which behaves exactly the same as
// memdb.Txn
func (db *changeTrackerDB) ReadTxn() *txn {
return &txn{Txn: db.db.Txn(false)}
//
// TODO: this could return a regular memdb.Txn if all the state functions accepted
// the ReadTxn interface
func (c *changeTrackerDB) ReadTxn() *txn {
return &txn{Txn: c.db.Txn(false)}
}
// WriteTxn returns a wrapped memdb.Txn suitable for writes to the state store.
@ -40,27 +65,39 @@ func (db *changeTrackerDB) ReadTxn() *txn {
// The exceptional cases are transactions that are executed on an empty
// memdb.DB as part of Restore, and those executed by tests where we insert
// data directly into the DB. These cases may use WriteTxnRestore.
func (db *changeTrackerDB) WriteTxn(idx uint64) *txn {
func (c *changeTrackerDB) WriteTxn(idx uint64) *txn {
t := &txn{
Txn: db.db.Txn(true),
Index: idx,
Txn: c.db.Txn(true),
Index: idx,
publish: c.publish,
}
t.Txn.TrackChanges()
return t
}
func (c *changeTrackerDB) publish(changes Changes) error {
readOnlyTx := c.db.Txn(false)
defer readOnlyTx.Abort()
events, err := c.processChanges(readOnlyTx, changes)
if err != nil {
return fmt.Errorf("failed generating events from changes: %v", err)
}
c.publisher.Publish(events)
return nil
}
// WriteTxnRestore returns a wrapped RW transaction that does NOT have change
// tracking enabled. This should only be used in Restore where we need to
// replace the entire contents of the Store without a need to track the changes.
// WriteTxnRestore uses a zero index since the whole restore doesn't really occur
// at one index - the effect is to write many values that were previously
// written across many indexes.
func (db *changeTrackerDB) WriteTxnRestore() *txn {
t := &txn{
Txn: db.db.Txn(true),
func (c *changeTrackerDB) WriteTxnRestore() *txn {
return &txn{
Txn: c.db.Txn(true),
Index: 0,
}
return t
}
// txn wraps a memdb.Txn to capture changes and send them to the EventPublisher.
@ -70,12 +107,13 @@ func (db *changeTrackerDB) WriteTxnRestore() *txn {
// error. Any errors from the callback would be lost, which would result in a
// missing change event, even though the state store had changed.
type txn struct {
*memdb.Txn
// Index in raft where the write is occurring. The value is zero for a
// read-only transaction, and for a WriteTxnRestore transaction.
// read-only, or WriteTxnRestore transaction.
// Index is stored so that it may be passed along to any subscribers as part
// of a change event.
Index uint64
*memdb.Txn
Index uint64
publish func(changes Changes) error
}
// Commit first pushes changes to EventPublisher, then calls Commit on the
@ -85,9 +123,35 @@ type txn struct {
// by the caller. A non-nil error indicates that a commit failed and was not
// applied.
func (tx *txn) Commit() error {
// changes may be empty if this is a read-only or WriteTxnRestore transaction.
// TODO(streaming): publish changes: changes := tx.Txn.Changes()
// publish may be nil if this is a read-only or WriteTxnRestore transaction.
// In those cases changes should also be empty, and there will be nothing
// to publish.
if tx.publish != nil {
changes := Changes{
Index: tx.Index,
Changes: tx.Txn.Changes(),
}
if err := tx.publish(changes); err != nil {
return err
}
}
tx.Txn.Commit()
return nil
}
// TODO: may be replaced by a gRPC type.
type topic string
func (t topic) String() string {
return string(t)
}
func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
// TODO: add other table handlers here.
return aclChangeUnsubscribeEvent(tx, changes)
}
func newSnapshotHandlers() stream.SnapshotHandlers {
return stream.SnapshotHandlers{}
}

View File

@ -1,9 +1,12 @@
package state
import (
"context"
"errors"
"fmt"
"time"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
)
@ -104,6 +107,10 @@ type Store struct {
// abandoned (usually during a restore). This is only ever closed.
abandonCh chan struct{}
// TODO: refactor abondonCh to use a context so that both can use the same
// cancel mechanism.
stopEventPublisher func()
// kvsGraveyard manages tombstones for the key value store.
kvsGraveyard *Graveyard
@ -153,15 +160,18 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) {
return nil, fmt.Errorf("Failed setting up state store: %s", err)
}
// Create and return the state store.
ctx, cancel := context.WithCancel(context.TODO())
s := &Store{
schema: schema,
abandonCh: make(chan struct{}),
kvsGraveyard: NewGraveyard(gc),
lockDelay: NewDelay(),
}
s.db = &changeTrackerDB{
db: db,
db: &changeTrackerDB{
db: db,
publisher: stream.NewEventPublisher(ctx, newSnapshotHandlers(), 10*time.Second),
processChanges: processDBChanges,
},
stopEventPublisher: cancel,
}
return s, nil
}
@ -234,6 +244,7 @@ func (s *Store) AbandonCh() <-chan struct{} {
// Abandon is used to signal that the given state store has been abandoned.
// Calling this more than one time will panic.
func (s *Store) Abandon() {
s.stopEventPublisher()
close(s.abandonCh)
}

View File

@ -0,0 +1,445 @@
package state
import (
"context"
"testing"
"time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/require"
)
func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
t.Parallel()
require := require.New(t)
s := testACLTokensStateStore(t)
// Setup token and wait for good state
token := createTokenAndWaitForACLEventPublish(t, s)
// Register the subscription.
subscription := &stream.SubscribeRequest{
Topic: topicService,
Key: "nope",
Token: token.SecretID,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(subscription)
require.NoError(err)
defer sub.Unsubscribe()
eventCh := testRunSub(sub)
// Stream should get EndOfSnapshot
e := assertEvent(t, eventCh)
require.True(e.IsEndOfSnapshot())
// Update an unrelated token.
token2 := &structs.ACLToken{
AccessorID: "a7bbf480-8440-4f55-acfc-6fdca25cb13e",
SecretID: "72e81982-7a0f-491f-a60e-c9c802ac1402",
}
token2.SetHash(false)
require.NoError(s.ACLTokenSet(3, token2.Clone(), false))
// Ensure there's no reset event.
assertNoEvent(t, eventCh)
// Now update the token used in the subscriber.
token3 := &structs.ACLToken{
AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4",
SecretID: "4268ce0d-d7ae-4718-8613-42eba9036020",
Description: "something else",
}
token3.SetHash(false)
require.NoError(s.ACLTokenSet(4, token3.Clone(), false))
// Ensure the reset event was sent.
err = assertErr(t, eventCh)
require.Equal(stream.ErrSubscriptionClosed, err)
// Register another subscription.
subscription2 := &stream.SubscribeRequest{
Topic: topicService,
Key: "nope",
Token: token.SecretID,
}
sub2, err := publisher.Subscribe(subscription2)
require.NoError(err)
defer sub2.Unsubscribe()
eventCh2 := testRunSub(sub2)
// Expect initial EoS
e = assertEvent(t, eventCh2)
require.True(e.IsEndOfSnapshot())
// Delete the unrelated token.
require.NoError(s.ACLTokenDeleteByAccessor(5, token2.AccessorID, nil))
// Ensure there's no reset event.
assertNoEvent(t, eventCh2)
// Delete the token used by the subscriber.
require.NoError(s.ACLTokenDeleteByAccessor(6, token.AccessorID, nil))
// Ensure the reset event was sent.
err = assertErr(t, eventCh2)
require.Equal(stream.ErrSubscriptionClosed, err)
}
func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
t.Parallel()
require := require.New(t)
s := testACLTokensStateStore(t)
// Create token and wait for good state
token := createTokenAndWaitForACLEventPublish(t, s)
// Register the subscription.
subscription := &stream.SubscribeRequest{
Topic: topicService,
Key: "nope",
Token: token.SecretID,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(subscription)
require.NoError(err)
defer sub.Unsubscribe()
eventCh := testRunSub(sub)
// Ignore the end of snapshot event
e := assertEvent(t, eventCh)
require.True(e.IsEndOfSnapshot(), "event should be a EoS got %v", e)
// Update an unrelated policy.
policy2 := structs.ACLPolicy{
ID: testPolicyID_C,
Name: "foo-read",
Rules: `node "foo" { policy = "read" }`,
Syntax: acl.SyntaxCurrent,
Datacenters: []string{"dc1"},
}
policy2.SetHash(false)
require.NoError(s.ACLPolicySet(3, &policy2))
// Ensure there's no reset event.
assertNoEvent(t, eventCh)
// Now update the policy used in the subscriber.
policy3 := structs.ACLPolicy{
ID: testPolicyID_A,
Name: "node-read",
Rules: `node_prefix "" { policy = "write" }`,
Syntax: acl.SyntaxCurrent,
Datacenters: []string{"dc1"},
}
policy3.SetHash(false)
require.NoError(s.ACLPolicySet(4, &policy3))
// Ensure the reset event was sent.
assertReset(t, eventCh, true)
// Register another subscription.
subscription2 := &stream.SubscribeRequest{
Topic: topicService,
Key: "nope",
Token: token.SecretID,
}
sub, err = publisher.Subscribe(subscription2)
require.NoError(err)
eventCh = testRunSub(sub)
// Ignore the end of snapshot event
e = assertEvent(t, eventCh)
require.True(e.IsEndOfSnapshot(), "event should be a EoS got %v", e)
// Delete the unrelated policy.
require.NoError(s.ACLPolicyDeleteByID(5, testPolicyID_C, nil))
// Ensure there's no reload event.
assertNoEvent(t, eventCh)
// Delete the policy used by the subscriber.
require.NoError(s.ACLPolicyDeleteByID(6, testPolicyID_A, nil))
// Ensure the reload event was sent.
err = assertErr(t, eventCh)
require.Equal(stream.ErrSubscriptionClosed, err)
// Register another subscription.
subscription3 := &stream.SubscribeRequest{
Topic: topicService,
Key: "nope",
Token: token.SecretID,
}
sub, err = publisher.Subscribe(subscription3)
require.NoError(err)
defer sub.Unsubscribe()
eventCh = testRunSub(sub)
// Ignore the end of snapshot event
e = assertEvent(t, eventCh)
require.True(e.IsEndOfSnapshot(), "event should be a EoS got %v", e)
// Now update the policy used in role B, but not directly in the token.
policy4 := structs.ACLPolicy{
ID: testPolicyID_B,
Name: "node-read",
Rules: `node_prefix "foo" { policy = "read" }`,
Syntax: acl.SyntaxCurrent,
Datacenters: []string{"dc1"},
}
policy4.SetHash(false)
require.NoError(s.ACLPolicySet(7, &policy4))
// Ensure the reset event was sent.
assertReset(t, eventCh, true)
}
func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) {
t.Parallel()
require := require.New(t)
s := testACLTokensStateStore(t)
// Create token and wait for good state
token := createTokenAndWaitForACLEventPublish(t, s)
// Register the subscription.
subscription := &stream.SubscribeRequest{
Topic: topicService,
Key: "nope",
Token: token.SecretID,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(subscription)
require.NoError(err)
eventCh := testRunSub(sub)
// Stream should get EndOfSnapshot
e := assertEvent(t, eventCh)
require.True(e.IsEndOfSnapshot())
// Update an unrelated role (the token has role testRoleID_B).
role := structs.ACLRole{
ID: testRoleID_A,
Name: "unrelated-role",
Description: "test",
}
role.SetHash(false)
require.NoError(s.ACLRoleSet(3, &role))
// Ensure there's no reload event.
assertNoEvent(t, eventCh)
// Now update the role used by the token in the subscriber.
role2 := structs.ACLRole{
ID: testRoleID_B,
Name: "my-new-role",
Description: "changed",
}
role2.SetHash(false)
require.NoError(s.ACLRoleSet(4, &role2))
// Ensure the reload event was sent.
assertReset(t, eventCh, false)
// Register another subscription.
subscription2 := &stream.SubscribeRequest{
Topic: topicService,
Key: "nope",
Token: token.SecretID,
}
sub, err = publisher.Subscribe(subscription2)
require.NoError(err)
eventCh = testRunSub(sub)
// Ignore the end of snapshot event
e = assertEvent(t, eventCh)
require.True(e.IsEndOfSnapshot(), "event should be a EoS got %v", e)
// Delete the unrelated policy.
require.NoError(s.ACLRoleDeleteByID(5, testRoleID_A, nil))
// Ensure there's no reload event.
assertNoEvent(t, eventCh)
// Delete the policy used by the subscriber.
require.NoError(s.ACLRoleDeleteByID(6, testRoleID_B, nil))
// Ensure the reload event was sent.
assertReset(t, eventCh, false)
}
type nextResult struct {
Events []stream.Event
Err error
}
func testRunSub(sub *stream.Subscription) <-chan nextResult {
eventCh := make(chan nextResult, 1)
go func() {
for {
es, err := sub.Next(context.TODO())
eventCh <- nextResult{
Events: es,
Err: err,
}
if err != nil {
return
}
}
}()
return eventCh
}
func assertNoEvent(t *testing.T, eventCh <-chan nextResult) {
t.Helper()
select {
case next := <-eventCh:
require.NoError(t, next.Err)
require.Len(t, next.Events, 1)
t.Fatalf("got unwanted event: %#v", next.Events[0].Payload)
case <-time.After(100 * time.Millisecond):
}
}
func assertEvent(t *testing.T, eventCh <-chan nextResult) *stream.Event {
t.Helper()
select {
case next := <-eventCh:
require.NoError(t, next.Err)
require.Len(t, next.Events, 1)
return &next.Events[0]
case <-time.After(100 * time.Millisecond):
t.Fatalf("no event after 100ms")
}
return nil
}
func assertErr(t *testing.T, eventCh <-chan nextResult) error {
t.Helper()
select {
case next := <-eventCh:
require.Error(t, next.Err)
return next.Err
case <-time.After(100 * time.Millisecond):
t.Fatalf("no err after 100ms")
}
return nil
}
// assertReset checks that a ResetStream event is send to the subscription
// within 100ms. If allowEOS is true it will ignore any intermediate events that
// come before the reset provided they are EndOfSnapshot events because in many
// cases it's non-deterministic whether the snapshot will complete before the
// acl reset is handled.
func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
t.Helper()
for {
select {
case next := <-eventCh:
if allowEOS {
if next.Err == nil && len(next.Events) == 1 && next.Events[0].IsEndOfSnapshot() {
continue
}
}
require.Error(t, next.Err)
require.Equal(t, stream.ErrSubscriptionClosed, next.Err)
return
case <-time.After(100 * time.Millisecond):
t.Fatalf("no err after 100ms")
}
}
}
var topicService stream.Topic = topic("test-topic-service")
func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers {
return stream.SnapshotHandlers{
topicService: func(req *stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) {
idx, nodes, err := s.ServiceNodes(nil, req.Key, nil)
if err != nil {
return idx, err
}
for _, node := range nodes {
event := stream.Event{
Topic: req.Topic,
Key: req.Key,
Index: node.ModifyIndex,
Payload: node,
}
snap.Append([]stream.Event{event})
}
return idx, nil
},
}
}
func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken {
token := &structs.ACLToken{
AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4",
SecretID: "4268ce0d-d7ae-4718-8613-42eba9036020",
Description: "something",
Policies: []structs.ACLTokenPolicyLink{
{ID: testPolicyID_A},
},
Roles: []structs.ACLTokenRoleLink{
{ID: testRoleID_B},
},
}
token.SetHash(false)
// If we subscribe immediately after we create a token we race with the
// publisher that is publishing the ACL token event for the token we just
// created. That means that the subscription we create right after will often
// be immediately reset. The most reliable way to avoid that without just
// sleeping for some arbitrary time is to pre-subscribe using the token before
// it actually exists (which works because the publisher doesn't check tokens
// it assumes something lower down did that) and then wait for it to be reset
// so we know the initial token write event has been sent out before
// continuing...
req := &stream.SubscribeRequest{
Topic: topicService,
Key: "nope",
Token: token.SecretID,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(req)
require.NoError(t, err)
defer sub.Unsubscribe()
eventCh := testRunSub(sub)
// Create the ACL token to be used in the subscription.
require.NoError(t, s.ACLTokenSet(2, token.Clone(), false))
// Wait for the pre-subscription to be reset
assertReset(t, eventCh, true)
return token
}

View File

@ -0,0 +1,52 @@
/*
Package stream provides a publish/subscribe system for events produced by changes
to the state store.
*/
package stream
import "fmt"
// Topic is an identifier that partitions events. A subscription will only receive
// events which match the Topic.
type Topic fmt.Stringer
// Event is a structure with identifiers and a payload. Events are Published to
// EventPublisher and returned to Subscribers.
type Event struct {
Topic Topic
Key string
Index uint64
Payload interface{}
}
// IsEndOfSnapshot returns true if this is a framing event that indicates the
// snapshot has completed. Future events from Subscription.Next will be
// change events.
func (e Event) IsEndOfSnapshot() bool {
return e.Payload == endOfSnapshot{}
}
// IsEndOfEmptySnapshot returns true if this is a framing event that indicates
// there is no snapshot. Future events from Subscription.Next will be
// change events.
func (e Event) IsEndOfEmptySnapshot() bool {
return e.Payload == endOfEmptySnapshot{}
}
type endOfSnapshot struct{}
type endOfEmptySnapshot struct{}
type closeSubscriptionPayload struct {
tokensSecretIDs []string
}
// NewCloseSubscriptionEvent returns a special Event that is handled by the
// stream package, and is never sent to subscribers. EventProcessor handles
// these events, and closes any subscriptions which were created using a token
// which matches any of the tokenSecretIDs.
//
// tokenSecretIDs may contain duplicate IDs.
func NewCloseSubscriptionEvent(tokenSecretIDs []string) Event {
return Event{Payload: closeSubscriptionPayload{tokensSecretIDs: tokenSecretIDs}}
}

View File

@ -0,0 +1,218 @@
package stream
import (
"context"
"errors"
"fmt"
"sync/atomic"
)
// eventBuffer is a single-writer, multiple-reader, unlimited length concurrent
// buffer of events that have been published on a topic. The buffer is
// effectively just the head of an atomically updated single-linked list. Atomic
// accesses are usually to be suspected as premature optimization but this
// specific design has several important features that significantly simplify a
// lot of our PubSub machinery.
//
// The eventBuffer only tracks the most recent set of published events, so
// if there are no consumers, older events are automatically garbage collected.
// Consumers are notified of new events by closing a channel on the previous head
// allowing efficient broadcast to many watchers without having to run multiple
// goroutines or deliver to O(N) separate channels.
//
// Because eventBuffer is a linked list with atomically updated pointers, readers don't
// have to take a lock and can consume at their own pace. We also don't need a
// fixed limit on the number of items, which avoids needing to configure
// buffer length to balance wasting memory, against being able to tolerate
// occasionally slow readers.
//
// The buffer is used to deliver all messages broadcast to a topic for active
// subscribers to consume, but it is also an effective way to both deliver and
// optionally cache snapshots per topic and key. By using an eventBuffer,
// snapshot functions don't have to read the whole snapshot into memory before
// delivery - they can stream from memdb. However simply by storing a pointer to
// the first event in the buffer, we can cache the buffered events for future
// watchers on the same topic. Finally, once we've delivered all the snapshot
// events to the buffer, we can append a next-element which is the first topic
// buffer element with a higher index and so consumers can keep reading the
// same buffer to have subsequent updates streamed after the snapshot is read.
//
// A huge benefit here is that caching snapshots becomes very simple - we don't
// have to do any additional book-keeping to figure out when to truncate the
// topic buffer to make sure the snapshot is still usable or run into issues
// where the cached snapshot is no longer useful since the buffer will keep
// elements around only as long as either the cache or a subscriber need them.
// So we can use whatever simple timeout logic we like to decide how long to
// keep caches (or if we should keep them at all) and the buffers will
// automatically keep the events we need to make that work for exactly the
// optimal amount of time and no longer.
//
// A new buffer is constructed with a sentinel "empty" bufferItem that has a nil
// Events array. This enables subscribers to start watching for the next update
// immediately.
//
// The zero value eventBuffer is _not_ usable, as it has not been
// initialized with an empty bufferItem so can not be used to wait for the first
// published event. Call newEventBuffer to construct a new buffer.
//
// Calls to Append or AppendBuffer that mutate the head must be externally
// synchronized. This allows systems that already serialize writes to append
// without lock overhead (e.g. a snapshot goroutine appending thousands of
// events).
type eventBuffer struct {
head atomic.Value
}
// newEventBuffer creates an eventBuffer ready for use.
func newEventBuffer() *eventBuffer {
b := &eventBuffer{}
b.head.Store(newBufferItem(nil))
return b
}
// Append a set of events from one raft operation to the buffer and notify
// watchers. Note that events must not have been previously made available to
// any other goroutine since we may mutate them to ensure ACL Rules are
// populated. After calling append, the caller must not make any further
// mutations to the events as they may have been exposed to subscribers in other
// goroutines. Append only supports a single concurrent caller and must be
// externally synchronized with other Append, AppendBuffer or AppendErr calls.
func (b *eventBuffer) Append(events []Event) {
b.AppendItem(newBufferItem(events))
}
// AppendBuffer joins another buffer which may be the tail of a separate buffer
// for example a buffer that's had the events from a snapshot appended may
// finally by linked to the topic buffer for the subsequent events so
// subscribers can seamlessly consume the updates. Note that Events in item must
// already be fully populated with ACL rules and must not be mutated further as
// they may have already been published to subscribers.
//
// AppendBuffer only supports a single concurrent caller and must be externally
// synchronized with other Append, AppendBuffer or AppendErr calls.
func (b *eventBuffer) AppendItem(item *bufferItem) {
// First store it as the next node for the old head this ensures once it's
// visible to new searchers the linked list is already valid. Not sure it
// matters but this seems nicer.
oldHead := b.Head()
oldHead.link.next.Store(item)
b.head.Store(item)
// Now it's added invalidate the oldHead to notify waiters
close(oldHead.link.ch)
// don't set chan to nil since that will race with readers accessing it.
}
// Head returns the current head of the buffer. It will always exist but it may
// be a "sentinel" empty item with a nil Events slice to allow consumers to
// watch for the next update. Consumers should always check for empty Events and
// treat them as no-ops. Will panic if eventBuffer was not initialized correctly
// with eventBuffer.
func (b *eventBuffer) Head() *bufferItem {
return b.head.Load().(*bufferItem)
}
// bufferItem represents a set of events published by a single raft operation.
// The first item returned by a newly constructed buffer will have nil Events.
// It is a sentinel value which is used to wait on the next events via Next.
//
// To iterate to the next event, a Next method may be called which may block if
// there is no next element yet.
//
// Holding a pointer to the item keeps all the events published since in memory
// so it's important that subscribers don't hold pointers to buffer items after
// they have been delivered except where it's intentional to maintain a cache or
// trailing store of events for performance reasons.
//
// Subscribers must not mutate the bufferItem or the Events or Encoded payloads
// inside as these are shared between all readers.
type bufferItem struct {
// Events is the set of events published at one raft index. This may be nil as
// a sentinel value to allow watching for the first event in a buffer. Callers
// should check and skip nil Events at any point in the buffer. It will also
// be nil if the producer appends an Error event because they can't complete
// the request to populate the buffer. Err will be non-nil in this case.
Events []Event
// Err is non-nil if the producer can't complete their task and terminates the
// buffer. Subscribers should return the error to clients and cease attempting
// to read from the buffer.
Err error
// link holds the next pointer and channel. This extra bit of indirection
// allows us to splice buffers together at arbitrary points without including
// events in one buffer just for the side-effect of watching for the next set.
// The link may not be mutated once the event is appended to a buffer.
link *bufferLink
}
type bufferLink struct {
// next is an atomically updated pointer to the next event in the buffer. It
// is written exactly once by the single published and will always be set if
// ch is closed.
next atomic.Value
// ch is closed when the next event is published. It should never be mutated
// (e.g. set to nil) as that is racey, but is closed once when the next event
// is published. the next pointer will have been set by the time this is
// closed.
ch chan struct{}
}
// newBufferItem returns a blank buffer item with a link and chan ready to have
// the fields set and be appended to a buffer.
func newBufferItem(events []Event) *bufferItem {
return &bufferItem{
link: &bufferLink{ch: make(chan struct{})},
Events: events,
}
}
// Next return the next buffer item in the buffer. It may block until ctx is
// cancelled or until the next item is published.
func (i *bufferItem) Next(ctx context.Context, forceClose <-chan struct{}) (*bufferItem, error) {
// See if there is already a next value, block if so. Note we don't rely on
// state change (chan nil) as that's not threadsafe but detecting close is.
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-forceClose:
return nil, fmt.Errorf("subscription closed")
case <-i.link.ch:
}
// If channel closed, there must be a next item to read
nextRaw := i.link.next.Load()
if nextRaw == nil {
// shouldn't be possible
return nil, errors.New("invalid next item")
}
next := nextRaw.(*bufferItem)
if next.Err != nil {
return nil, next.Err
}
return next, nil
}
// NextNoBlock returns the next item in the buffer without blocking. If it
// reaches the most recent item it will return nil.
func (i *bufferItem) NextNoBlock() *bufferItem {
nextRaw := i.link.next.Load()
if nextRaw == nil {
return nil
}
return nextRaw.(*bufferItem)
}
// NextLink returns either the next item in the buffer if there is one, or
// an empty item (that will be ignored by subscribers) that has a pointer to
// the same link as this bufferItem (but none of the bufferItem content).
// When the link.ch is closed, subscriptions will be notified of the next item.
func (i *bufferItem) NextLink() *bufferItem {
next := i.NextNoBlock()
if next == nil {
// Return an empty item that can be followed to the next item published.
return &bufferItem{link: i.link}
}
return next
}

View File

@ -0,0 +1,89 @@
package stream
import (
"context"
fmt "fmt"
"math/rand"
"testing"
time "time"
"github.com/stretchr/testify/assert"
)
// A property-based test to ensure that under heavy concurrent use trivial
// correctness properties are not violated (and that -race doesn't complain).
func TestEventBufferFuzz(t *testing.T) {
if testing.Short() {
t.Skip("too slow for short run")
}
nReaders := 1000
nMessages := 1000
b := newEventBuffer()
// Start a write goroutine that will publish 10000 messages with sequential
// indexes and some jitter in timing (to allow clients to "catch up" and block
// waiting for updates).
go func() {
seed := time.Now().UnixNano()
t.Logf("Using seed %d", seed)
// z is a Zipfian distribution that gives us a number of milliseconds to
// sleep which are mostly low - near zero but occasionally spike up to near
// 100.
z := rand.NewZipf(rand.New(rand.NewSource(seed)), 1.5, 1.5, 50)
for i := 0; i < nMessages; i++ {
// Event content is arbitrary and not valid for our use of buffers in
// streaming - here we only care about the semantics of the buffer.
e := Event{
Index: uint64(i), // Indexes should be contiguous
Topic: testTopic,
}
b.Append([]Event{e})
// Sleep sometimes for a while to let some subscribers catch up
wait := time.Duration(z.Uint64()) * time.Millisecond
time.Sleep(wait)
}
}()
// Run n subscribers following and verifying
errCh := make(chan error, nReaders)
// Load head here so all subscribers start from the same point or they might
// not run until several appends have already happened.
head := b.Head()
for i := 0; i < nReaders; i++ {
go func(i int) {
expect := uint64(0)
item := head
var err error
for {
item, err = item.Next(context.Background(), nil)
if err != nil {
errCh <- fmt.Errorf("subscriber %05d failed getting next %d: %s", i,
expect, err)
return
}
if item.Events[0].Index != expect {
errCh <- fmt.Errorf("subscriber %05d got bad event want=%d, got=%d", i,
expect, item.Events[0].Index)
return
}
expect++
if expect == uint64(nMessages) {
// Succeeded
errCh <- nil
return
}
}
}(i)
}
// Wait for all readers to finish one way or other
for i := 0; i < nReaders; i++ {
err := <-errCh
assert.NoError(t, err)
}
}

View File

@ -0,0 +1,282 @@
package stream
import (
"context"
"fmt"
"sync"
"time"
)
// EventPublisher receives change events from Publish, and sends the events to
// all subscribers of the event Topic.
type EventPublisher struct {
// snapCacheTTL controls how long we keep snapshots in our cache before
// allowing them to be garbage collected and a new one made for subsequent
// requests for that topic and key. In general this should be pretty short to
// keep memory overhead of duplicated event data low - snapshots are typically
// not that expensive, but having a cache for a few seconds can help
// de-duplicate building the same snapshot over and over again when a
// thundering herd of watchers all subscribe to the same topic within a few
// seconds.
snapCacheTTL time.Duration
// This lock protects the topicBuffers, and snapCache
lock sync.RWMutex
// topicBuffers stores the head of the linked-list buffer to publish events to
// for a topic.
topicBuffers map[Topic]*eventBuffer
// snapCache if a cache of EventSnapshots indexed by topic and key.
// TODO(streaming): new snapshotCache struct for snapCache and snapCacheTTL
snapCache map[Topic]map[string]*eventSnapshot
subscriptions *subscriptions
// publishCh is used to send messages from an active txn to a goroutine which
// publishes events, so that publishing can happen asynchronously from
// the Commit call in the FSM hot path.
publishCh chan changeEvents
snapshotHandlers SnapshotHandlers
}
type subscriptions struct {
// lock for byToken. If both subscription.lock and EventPublisher.lock need
// to be held, EventPublisher.lock MUST always be acquired first.
lock sync.RWMutex
// byToken is an mapping of active Subscriptions indexed by a the token and
// a pointer to the request.
// When the token is modified all subscriptions under that token will be
// reloaded.
// A subscription may be unsubscribed by using the pointer to the request.
byToken map[string]map[*SubscribeRequest]*Subscription
}
type changeEvents struct {
events []Event
}
// SnapshotHandlers is a mapping of Topic to a function which produces a snapshot
// of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender.
// The nil Topic is reserved and should not be used.
type SnapshotHandlers map[Topic]func(*SubscribeRequest, SnapshotAppender) (index uint64, err error)
// SnapshotAppender appends groups of events to create a Snapshot of state.
type SnapshotAppender interface {
// Append events to the snapshot. Every event in the slice must have the same
// Index, indicating that it is part of the same raft transaction.
Append(events []Event)
}
// NewEventPublisher returns an EventPublisher for publishing change events.
// Handlers are used to convert the memDB changes into events.
// A goroutine is run in the background to publish events to all subscribes.
// Cancelling the context will shutdown the goroutine, to free resources,
// and stop all publishing.
func NewEventPublisher(ctx context.Context, handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher {
e := &EventPublisher{
snapCacheTTL: snapCacheTTL,
topicBuffers: make(map[Topic]*eventBuffer),
snapCache: make(map[Topic]map[string]*eventSnapshot),
publishCh: make(chan changeEvents, 64),
subscriptions: &subscriptions{
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
},
snapshotHandlers: handlers,
}
go e.handleUpdates(ctx)
return e
}
// Publish events to all subscribers of the event Topic.
func (e *EventPublisher) Publish(events []Event) {
if len(events) > 0 {
e.publishCh <- changeEvents{events: events}
}
}
func (e *EventPublisher) handleUpdates(ctx context.Context) {
for {
select {
case <-ctx.Done():
// TODO: also close all subscriptions so the subscribers are moved
// to the new publisher?
return
case update := <-e.publishCh:
e.sendEvents(update)
}
}
}
// sendEvents sends the given events to any applicable topic listeners, as well
// as any ACL update events to cause affected listeners to reset their stream.
func (e *EventPublisher) sendEvents(update changeEvents) {
eventsByTopic := make(map[Topic][]Event)
for _, event := range update.events {
if unsubEvent, ok := event.Payload.(closeSubscriptionPayload); ok {
e.subscriptions.closeSubscriptionsForTokens(unsubEvent.tokensSecretIDs)
continue
}
eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event)
}
e.lock.Lock()
defer e.lock.Unlock()
for topic, events := range eventsByTopic {
e.getTopicBuffer(topic).Append(events)
}
}
// getTopicBuffer for the topic. Creates a new event buffer if one does not
// already exist.
//
// EventPublisher.lock must be held to call this method.
func (e *EventPublisher) getTopicBuffer(topic Topic) *eventBuffer {
buf, ok := e.topicBuffers[topic]
if !ok {
buf = newEventBuffer()
e.topicBuffers[topic] = buf
}
return buf
}
// Subscribe returns a new Subscription for the given request. A subscription
// will receive an initial snapshot of events matching the request if req.Index > 0.
// After the snapshot, events will be streamed as they are created.
// Subscriptions may be closed, forcing the client to resubscribe (for example if
// ACL policies changed or the state store is abandoned).
//
// When the caller is finished with the subscription for any reason, it must
// call Subscription.Unsubscribe to free ACL tracking resources.
func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) {
// Ensure we know how to make a snapshot for this topic
_, ok := e.snapshotHandlers[req.Topic]
if !ok || req.Topic == nil {
return nil, fmt.Errorf("unknown topic %v", req.Topic)
}
e.lock.Lock()
defer e.lock.Unlock()
// Ensure there is a topic buffer for that topic so we start capturing any
// future published events.
buf := e.getTopicBuffer(req.Topic)
// See if we need a snapshot
topicHead := buf.Head()
var sub *Subscription
if req.Index > 0 && len(topicHead.Events) > 0 && topicHead.Events[0].Index == req.Index {
// No need for a snapshot, send the "end of empty snapshot" message to signal to
// client its cache is still good, then follow along from here in the topic.
buf := newEventBuffer()
// Store the head of that buffer before we append to it to give as the
// starting point for the subscription.
subHead := buf.Head()
buf.Append([]Event{{
Index: req.Index,
Topic: req.Topic,
Key: req.Key,
Payload: endOfEmptySnapshot{},
}})
// Now splice the rest of the topic buffer on so the subscription will
// continue to see future updates in the topic buffer.
buf.AppendItem(topicHead.NextLink())
sub = newSubscription(req, subHead, e.subscriptions.unsubscribe(req))
} else {
snap, err := e.getSnapshotLocked(req, topicHead)
if err != nil {
return nil, err
}
sub = newSubscription(req, snap.Head, e.subscriptions.unsubscribe(req))
}
e.subscriptions.add(req, sub)
return sub, nil
}
func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) {
s.lock.Lock()
defer s.lock.Unlock()
subsByToken, ok := s.byToken[req.Token]
if !ok {
subsByToken = make(map[*SubscribeRequest]*Subscription)
s.byToken[req.Token] = subsByToken
}
subsByToken[req] = sub
}
func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) {
s.lock.RLock()
defer s.lock.RUnlock()
for _, secretID := range tokenSecretIDs {
if subs, ok := s.byToken[secretID]; ok {
for _, sub := range subs {
sub.forceClose()
}
}
}
}
// unsubscribe returns a function that the subscription will call to remove
// itself from the subsByToken.
// This function is returned as a closure so that the caller doesn't need to keep
// track of the SubscriptionRequest, and can not accidentally call unsubscribe with the
// wrong pointer.
func (s *subscriptions) unsubscribe(req *SubscribeRequest) func() {
return func() {
s.lock.Lock()
defer s.lock.Unlock()
subsByToken, ok := s.byToken[req.Token]
if !ok {
return
}
delete(subsByToken, req)
if len(subsByToken) == 0 {
delete(s.byToken, req.Token)
}
}
}
func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *bufferItem) (*eventSnapshot, error) {
topicSnaps, ok := e.snapCache[req.Topic]
if !ok {
topicSnaps = make(map[string]*eventSnapshot)
e.snapCache[req.Topic] = topicSnaps
}
snap, ok := topicSnaps[req.Key]
if ok && snap.err() == nil {
return snap, nil
}
handler, ok := e.snapshotHandlers[req.Topic]
if !ok {
return nil, fmt.Errorf("unknown topic %v", req.Topic)
}
snap = newEventSnapshot(req, topicHead, handler)
if e.snapCacheTTL > 0 {
topicSnaps[req.Key] = snap
// Trigger a clearout after TTL
time.AfterFunc(e.snapCacheTTL, func() {
e.lock.Lock()
defer e.lock.Unlock()
delete(topicSnaps, req.Key)
})
}
return snap, nil
}

View File

@ -0,0 +1,113 @@
package stream
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
)
type intTopic int
func (i intTopic) String() string {
return fmt.Sprintf("%d", i)
}
var testTopic Topic = intTopic(999)
func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
subscription := &SubscribeRequest{
Topic: testTopic,
Key: "sub-key",
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(ctx, newTestSnapshotHandlers(), 0)
sub, err := publisher.Subscribe(subscription)
require.NoError(t, err)
eventCh := consumeSubscription(ctx, sub)
result := nextResult(t, eventCh)
require.NoError(t, result.Err)
expected := []Event{{Payload: "snapshot-event-payload", Key: "sub-key"}}
require.Equal(t, expected, result.Events)
result = nextResult(t, eventCh)
require.Len(t, result.Events, 1)
require.True(t, result.Events[0].IsEndOfSnapshot())
// Now subscriber should block waiting for updates
assertNoResult(t, eventCh)
events := []Event{{
Topic: testTopic,
Key: "sub-key",
Payload: "the-published-event-payload",
}}
publisher.Publish(events)
// Subscriber should see the published event
result = nextResult(t, eventCh)
require.NoError(t, result.Err)
expected = []Event{{Payload: "the-published-event-payload", Key: "sub-key", Topic: testTopic}}
require.Equal(t, expected, result.Events)
}
func newTestSnapshotHandlers() SnapshotHandlers {
return SnapshotHandlers{
testTopic: func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) {
if req.Topic != testTopic {
return 0, fmt.Errorf("unexpected topic: %v", req.Topic)
}
buf.Append([]Event{{Payload: "snapshot-event-payload", Key: "sub-key"}})
return 1, nil
},
}
}
func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult {
eventCh := make(chan subNextResult, 1)
go func() {
for {
es, err := sub.Next(ctx)
eventCh <- subNextResult{
Events: es,
Err: err,
}
if err != nil {
return
}
}
}()
return eventCh
}
type subNextResult struct {
Events []Event
Err error
}
func nextResult(t *testing.T, eventCh <-chan subNextResult) subNextResult {
t.Helper()
select {
case next := <-eventCh:
return next
case <-time.After(100 * time.Millisecond):
t.Fatalf("no event after 100ms")
}
return subNextResult{}
}
func assertNoResult(t *testing.T, eventCh <-chan subNextResult) {
t.Helper()
select {
case next := <-eventCh:
require.NoError(t, next.Err)
require.Len(t, next.Events, 1)
t.Fatalf("received unexpected event: %#v", next.Events[0].Payload)
case <-time.After(100 * time.Millisecond):
}
}

View File

@ -0,0 +1,100 @@
package stream
// eventSnapshot represents the state of memdb for a given topic and key at some
// point in time. It is modelled as a buffer of events so that snapshots can be
// streamed to possibly multiple subscribers concurrently, and can be trivially
// cached by retaining a reference to a Snapshot. Once the reference to eventSnapshot
// is dropped from memory, any subscribers still reading from it may do so by following
// their pointers. When the last subscriber unsubscribes, the snapshot is garbage
// collected automatically by Go's runtime. This simplifies snapshot and buffer
// management dramatically.
type eventSnapshot struct {
// Head is the first item in the buffer containing the snapshot. Once the
// snapshot is complete, subsequent BufferItems are appended to snapBuffer,
// so that subscribers receive all the events from the same buffer.
Head *bufferItem
// snapBuffer is the Head of the snapshot buffer the fn should write to.
snapBuffer *eventBuffer
}
type snapFunc func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error)
// newEventSnapshot creates a snapshot buffer based on the subscription request.
// The current buffer head for the topic requested is passed so that once the
// snapshot is complete and has been delivered into the buffer, any events
// published during snapshotting can be immediately appended and won't be
// missed. Once the snapshot is delivered the topic buffer is spliced onto the
// snapshot buffer so that subscribers will naturally follow from the snapshot
// to wait for any subsequent updates.
func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn snapFunc) *eventSnapshot {
buf := newEventBuffer()
s := &eventSnapshot{
Head: buf.Head(),
snapBuffer: buf,
}
go func() {
idx, err := fn(req, s.snapBuffer)
if err != nil {
s.snapBuffer.AppendItem(&bufferItem{Err: err})
return
}
// We wrote the snapshot events to the buffer, send the "end of snapshot" event
s.snapBuffer.Append([]Event{{
Topic: req.Topic,
Key: req.Key,
Index: idx,
Payload: endOfSnapshot{},
}})
s.spliceFromTopicBuffer(topicBufferHead, idx)
}()
return s
}
func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx uint64) {
// Now splice on the topic buffer. We need to iterate through the buffer to
// find the first event after the current snapshot.
item := topicBufferHead
for {
next := item.NextNoBlock()
switch {
case next == nil:
// This is the head of the topic buffer (or was just now which is after
// the snapshot completed). We don't want any of the events (if any) in
// the snapshot buffer as they came before the snapshot but we do need to
// wait for the next update.
s.snapBuffer.AppendItem(item.NextLink())
return
case next.Err != nil:
// This case is not currently possible because errors can only come
// from a snapshot func, and this is consuming events from a topic
// buffer which does not contain a snapshot.
// Handle this case anyway in case errors can come from other places
// in the future.
s.snapBuffer.AppendItem(next)
return
case len(next.Events) > 0 && next.Events[0].Index > idx:
// We've found an update in the topic buffer that happened after our
// snapshot was taken, splice it into the snapshot buffer so subscribers
// can continue to read this and others after it.
s.snapBuffer.AppendItem(next)
return
}
// We don't need this item, continue to next
item = next
}
}
// err returns an error if the snapshot func has failed with an error or nil
// otherwise. Nil doesn't necessarily mean there won't be an error but there
// hasn't been one yet.
func (s *eventSnapshot) err() error {
// Fetch the head of the buffer, this is atomic. If the snapshot func errored
// then the last event will be an error.
head := s.snapBuffer.Head()
return head.Err
}

View File

@ -0,0 +1,176 @@
package stream
import (
"context"
fmt "fmt"
"testing"
time "time"
"github.com/stretchr/testify/require"
)
func TestEventSnapshot(t *testing.T) {
// Setup a dummy state that we can manipulate easily. The properties we care
// about are that we publish some sequence of events as a snapshot and then
// follow them up with "live updates". We control the interleaving. Our state
// consists of health events (only type fully defined so far) for service
// instances with consecutive ID numbers starting from 0 (e.g. test-000,
// test-001). The snapshot is delivered at index 1000. updatesBeforeSnap
// controls how many updates are delivered _before_ the snapshot is complete
// (with an index < 1000). updatesBeforeSnap controls the number of updates
// delivered after (index > 1000).
//
// In all cases the invariant should be that we end up with all of the
// instances in the snapshot, plus any delivered _after_ the snapshot index,
// but none delivered _before_ the snapshot index otherwise we may have an
// inconsistent snapshot.
cases := []struct {
name string
snapshotSize int
updatesBeforeSnap int
updatesAfterSnap int
}{
{
name: "snapshot with subsequent mutations",
snapshotSize: 10,
updatesBeforeSnap: 0,
updatesAfterSnap: 10,
},
{
name: "snapshot with concurrent mutations",
snapshotSize: 10,
updatesBeforeSnap: 5,
updatesAfterSnap: 5,
},
{
name: "empty snapshot with subsequent mutations",
snapshotSize: 0,
updatesBeforeSnap: 0,
updatesAfterSnap: 10,
},
{
name: "empty snapshot with concurrent mutations",
snapshotSize: 0,
updatesBeforeSnap: 5,
updatesAfterSnap: 5,
},
}
snapIndex := uint64(1000)
for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
require.True(t, tc.updatesBeforeSnap < 999,
"bad test param updatesBeforeSnap must be less than the snapshot"+
" index (%d) minus one (%d), got: %d", snapIndex, snapIndex-1,
tc.updatesBeforeSnap)
// Create a snapshot func that will deliver registration events.
snFn := testHealthConsecutiveSnapshotFn(tc.snapshotSize, snapIndex)
// Create a topic buffer for updates
tb := newEventBuffer()
// Capture the topic buffer head now so updatesBeforeSnap are "concurrent"
// and are seen by the eventSnapshot once it completes the snap.
tbHead := tb.Head()
// Deliver any pre-snapshot events simulating updates that occur after the
// topic buffer is captured during a Subscribe call, but before the
// snapshot is made of the FSM.
for i := tc.updatesBeforeSnap; i > 0; i-- {
index := snapIndex - uint64(i)
// Use an instance index that's unique and should never appear in the
// output so we can be sure these were not included as they came before
// the snapshot.
tb.Append([]Event{newDefaultHealthEvent(index, 10000+i)})
}
// Create eventSnapshot, (will call snFn in another goroutine). The
// Request is ignored by the snapFunc so doesn't matter for now.
es := newEventSnapshot(&SubscribeRequest{}, tbHead, snFn)
// Deliver any post-snapshot events simulating updates that occur
// logically after snapshot. It doesn't matter that these might actually
// be appended before the snapshot fn executes in another goroutine since
// it's operating an a possible stale "snapshot". This is the same as
// reality with the state store where updates that occur after the
// snapshot is taken but while the SnapFnis still running must be captured
// correctly.
for i := 0; i < tc.updatesAfterSnap; i++ {
index := snapIndex + 1 + uint64(i)
// Use an instance index that's unique.
tb.Append([]Event{newDefaultHealthEvent(index, 20000+i)})
}
// Now read the snapshot buffer until we've received everything we expect.
// Don't wait too long in case we get stuck.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
snapIDs := make([]string, 0, tc.snapshotSize)
updateIDs := make([]string, 0, tc.updatesAfterSnap)
snapDone := false
curItem := es.Head
var err error
RECV:
for {
curItem, err = curItem.Next(ctx, nil)
// This error is typically timeout so dump the state to aid debugging.
require.NoError(t, err,
"current state: snapDone=%v snapIDs=%s updateIDs=%s", snapDone,
snapIDs, updateIDs)
e := curItem.Events[0]
switch {
case snapDone:
payload, ok := e.Payload.(string)
require.True(t, ok, "want health event got: %#v", e.Payload)
updateIDs = append(updateIDs, payload)
if len(updateIDs) == tc.updatesAfterSnap {
// We're done!
break RECV
}
case e.IsEndOfSnapshot():
snapDone = true
default:
payload, ok := e.Payload.(string)
require.True(t, ok, "want health event got: %#v", e.Payload)
snapIDs = append(snapIDs, payload)
}
}
// Validate the event IDs we got delivered.
require.Equal(t, genSequentialIDs(0, tc.snapshotSize), snapIDs)
require.Equal(t, genSequentialIDs(20000, 20000+tc.updatesAfterSnap), updateIDs)
})
}
}
func genSequentialIDs(start, end int) []string {
ids := make([]string, 0, end-start)
for i := start; i < end; i++ {
ids = append(ids, fmt.Sprintf("test-event-%03d", i))
}
return ids
}
func testHealthConsecutiveSnapshotFn(size int, index uint64) snapFunc {
return func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) {
for i := 0; i < size; i++ {
// Event content is arbitrary we are just using Health because it's the
// first type defined. We just want a set of things with consecutive
// names.
buf.Append([]Event{newDefaultHealthEvent(index, i)})
}
return index, nil
}
}
func newDefaultHealthEvent(index uint64, n int) Event {
return Event{
Index: index,
Topic: testTopic,
Payload: fmt.Sprintf("test-event-%03d", n),
}
}

View File

@ -0,0 +1,17 @@
package stream
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestEvent_IsEndOfSnapshot(t *testing.T) {
e := Event{Payload: endOfSnapshot{}}
require.True(t, e.IsEndOfSnapshot())
t.Run("not EndOfSnapshot", func(t *testing.T) {
e := Event{Payload: endOfEmptySnapshot{}}
require.False(t, e.IsEndOfSnapshot())
})
}

View File

@ -0,0 +1,136 @@
package stream
import (
"context"
"errors"
"sync/atomic"
)
const (
// subscriptionStateOpen is the default state of a subscription. An open
// subscription may receive new events.
subscriptionStateOpen uint32 = 0
// subscriptionStateClosed indicates that the subscription was closed, possibly
// as a result of a change to an ACL token, and will not receive new events.
// The subscriber must issue a new Subscribe request.
subscriptionStateClosed uint32 = 1
)
// ErrSubscriptionClosed is a error signalling the subscription has been
// closed. The client should Unsubscribe, then re-Subscribe.
var ErrSubscriptionClosed = errors.New("subscription closed by server, client should resubscribe")
// Subscription provides events on a Topic. Events may be filtered by Key.
// Events are returned by Next(), and may start with a Snapshot of events.
type Subscription struct {
// state is accessed atomically 0 means open, 1 means closed with reload
state uint32
// req is the requests that we are responding to
req *SubscribeRequest
// currentItem stores the current snapshot or topic buffer item we are on. It
// is mutated by calls to Next.
currentItem *bufferItem
// forceClosed is closed when forceClose is called. It is used by
// EventPublisher to cancel Next().
forceClosed chan struct{}
// unsub is a function set by EventPublisher that is called to free resources
// when the subscription is no longer needed.
// It must be safe to call the function from multiple goroutines and the function
// must be idempotent.
unsub func()
}
// SubscribeRequest identifies the types of events the subscriber would like to
// receiver. Topic and Token are required.
type SubscribeRequest struct {
Topic Topic
Key string
Token string
Index uint64
}
// newSubscription return a new subscription. The caller is responsible for
// calling Unsubscribe when it is done with the subscription, to free resources.
func newSubscription(req *SubscribeRequest, item *bufferItem, unsub func()) *Subscription {
return &Subscription{
forceClosed: make(chan struct{}),
req: req,
currentItem: item,
unsub: unsub,
}
}
// Next returns the next set of events to deliver. It must only be called from a
// single goroutine concurrently as it mutates the Subscription.
func (s *Subscription) Next(ctx context.Context) ([]Event, error) {
if atomic.LoadUint32(&s.state) == subscriptionStateClosed {
return nil, ErrSubscriptionClosed
}
for {
next, err := s.currentItem.Next(ctx, s.forceClosed)
switch {
case err != nil && atomic.LoadUint32(&s.state) == subscriptionStateClosed:
return nil, ErrSubscriptionClosed
case err != nil:
return nil, err
}
s.currentItem = next
events := s.filter(next.Events)
if len(events) == 0 {
continue
}
return events, nil
}
}
// TODO: test cases for this method
func (s *Subscription) filter(events []Event) []Event {
if s.req.Key == "" || len(events) == 0 {
return events
}
allMatch := true
for _, e := range events {
if s.req.Key != e.Key {
allMatch = false
break
}
}
// Only allocate a new slice if some events need to be filtered out
if allMatch {
return events
}
// FIXME: this will over-allocate. We could get a count from the previous range
// over events.
events = make([]Event, 0, len(events))
for _, e := range events {
if s.req.Key == e.Key {
events = append(events, e)
}
}
return events
}
// Close the subscription. Subscribers will receive an error when they call Next,
// and will need to perform a new Subscribe request.
// It is safe to call from any goroutine.
func (s *Subscription) forceClose() {
swapped := atomic.CompareAndSwapUint32(&s.state, subscriptionStateOpen, subscriptionStateClosed)
if swapped {
close(s.forceClosed)
}
}
// Unsubscribe the subscription, freeing resources.
func (s *Subscription) Unsubscribe() {
s.unsub()
}

View File

@ -0,0 +1,150 @@
package stream
import (
"context"
"testing"
time "time"
"github.com/stretchr/testify/require"
)
func noopUnSub() {}
func TestSubscription(t *testing.T) {
eb := newEventBuffer()
index := uint64(100)
startHead := eb.Head()
// Start with an event in the buffer
publishTestEvent(index, eb, "test")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create a subscription
req := &SubscribeRequest{
Topic: testTopic,
Key: "test",
}
sub := newSubscription(req, startHead, noopUnSub)
// First call to sub.Next should return our published event immediately
start := time.Now()
got, err := sub.Next(ctx)
elapsed := time.Since(start)
require.NoError(t, err)
require.True(t, elapsed < 200*time.Millisecond,
"Event should have been delivered immediately, took %s", elapsed)
require.Len(t, got, 1)
require.Equal(t, index, got[0].Index)
// Schedule an event publish in a while
index++
start = time.Now()
time.AfterFunc(200*time.Millisecond, func() {
publishTestEvent(index, eb, "test")
})
// Next call should block until event is delivered
got, err = sub.Next(ctx)
elapsed = time.Since(start)
require.NoError(t, err)
require.True(t, elapsed > 200*time.Millisecond,
"Event should have been delivered after blocking 200ms, took %s", elapsed)
require.True(t, elapsed < 2*time.Second,
"Event should have been delivered after short time, took %s", elapsed)
require.Len(t, got, 1)
require.Equal(t, index, got[0].Index)
// Event with wrong key should not be delivered. Deliver a good message right
// so we don't have to block test thread forever or cancel func yet.
index++
publishTestEvent(index, eb, "nope")
index++
publishTestEvent(index, eb, "test")
start = time.Now()
got, err = sub.Next(ctx)
elapsed = time.Since(start)
require.NoError(t, err)
require.True(t, elapsed < 200*time.Millisecond,
"Event should have been delivered immediately, took %s", elapsed)
require.Len(t, got, 1)
require.Equal(t, index, got[0].Index)
require.Equal(t, "test", got[0].Key)
// Cancelling the subscription context should unblock Next
start = time.Now()
time.AfterFunc(200*time.Millisecond, func() {
cancel()
})
_, err = sub.Next(ctx)
elapsed = time.Since(start)
require.Error(t, err)
require.True(t, elapsed > 200*time.Millisecond,
"Event should have been delivered after blocking 200ms, took %s", elapsed)
require.True(t, elapsed < 2*time.Second,
"Event should have been delivered after short time, took %s", elapsed)
}
func TestSubscription_Close(t *testing.T) {
eb := newEventBuffer()
index := uint64(100)
startHead := eb.Head()
// Start with an event in the buffer
publishTestEvent(index, eb, "test")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create a subscription
req := &SubscribeRequest{
Topic: testTopic,
Key: "test",
}
sub := newSubscription(req, startHead, noopUnSub)
// First call to sub.Next should return our published event immediately
start := time.Now()
got, err := sub.Next(ctx)
elapsed := time.Since(start)
require.NoError(t, err)
require.True(t, elapsed < 200*time.Millisecond,
"Event should have been delivered immediately, took %s", elapsed)
require.Len(t, got, 1)
require.Equal(t, index, got[0].Index)
// Schedule a Close simulating the server deciding this subscroption
// needs to reset (e.g. on ACL perm change).
start = time.Now()
time.AfterFunc(200*time.Millisecond, func() {
sub.forceClose()
})
_, err = sub.Next(ctx)
elapsed = time.Since(start)
require.Error(t, err)
require.Equal(t, ErrSubscriptionClosed, err)
require.True(t, elapsed > 200*time.Millisecond,
"Reload should have happened after blocking 200ms, took %s", elapsed)
require.True(t, elapsed < 2*time.Second,
"Reload should have been delivered after short time, took %s", elapsed)
}
func publishTestEvent(index uint64, b *eventBuffer, key string) {
// Don't care about the event payload for now just the semantics of publishing
// something. This is not a valid stream in the end-to-end streaming protocol
// but enough to test subscription mechanics.
e := Event{
Index: index,
Topic: testTopic,
Key: key,
}
b.Append([]Event{e})
}