diff --git a/agent/consul/state/acl_events.go b/agent/consul/state/acl_events.go new file mode 100644 index 0000000000..f033e880f9 --- /dev/null +++ b/agent/consul/state/acl_events.go @@ -0,0 +1,81 @@ +package state + +import ( + "github.com/hashicorp/consul/agent/agentpb" + "github.com/hashicorp/consul/agent/structs" + memdb "github.com/hashicorp/go-memdb" +) + +// ACLEventsFromChanges returns all the ACL token, policy or role events that +// should be emitted given a set of changes to the state store. +func (s *Store) ACLEventsFromChanges(tx *txn, changes memdb.Changes) ([]agentpb.Event, error) { + + // Don't allocate yet since in majority of update transactions no ACL token + // will be changed. + var events []agentpb.Event + + getObj := func(change memdb.Change) interface{} { + if change.Deleted() { + return change.Before + } + return change.After + } + + getOp := func(change memdb.Change) agentpb.ACLOp { + if change.Deleted() { + return agentpb.ACLOp_Delete + } + return agentpb.ACLOp_Update + } + + for _, change := range changes { + switch change.Table { + case "acl-tokens": + token := getObj(change).(*structs.ACLToken) + e := agentpb.Event{ + Topic: agentpb.Topic_ACLTokens, + Index: tx.Index, + Payload: &agentpb.Event_ACLToken{ + ACLToken: &agentpb.ACLTokenUpdate{ + Op: getOp(change), + Token: &agentpb.ACLTokenIdentifier{ + AccessorID: token.AccessorID, + SecretID: token.SecretID, + }, + }, + }, + } + events = append(events, e) + case "acl-policies": + policy := getObj(change).(*structs.ACLPolicy) + e := agentpb.Event{ + Topic: agentpb.Topic_ACLPolicies, + Index: tx.Index, + Payload: &agentpb.Event_ACLPolicy{ + ACLPolicy: &agentpb.ACLPolicyUpdate{ + Op: getOp(change), + PolicyID: policy.ID, + }, + }, + } + events = append(events, e) + case "acl-roles": + role := getObj(change).(*structs.ACLRole) + e := agentpb.Event{ + Topic: agentpb.Topic_ACLRoles, + Index: tx.Index, + Payload: &agentpb.Event_ACLRole{ + ACLRole: &agentpb.ACLRoleUpdate{ + Op: getOp(change), + RoleID: role.ID, + }, + }, + } + events = append(events, e) + default: + continue + } + } + + return events, nil +} diff --git a/agent/consul/state/acl_events_test.go b/agent/consul/state/acl_events_test.go new file mode 100644 index 0000000000..e471042b8e --- /dev/null +++ b/agent/consul/state/acl_events_test.go @@ -0,0 +1,342 @@ +package state + +import ( + "strconv" + "strings" + "testing" + + "github.com/hashicorp/consul/agent/agentpb" + "github.com/hashicorp/consul/agent/structs" + "github.com/stretchr/testify/require" +) + +func testACLTokenEvent(t *testing.T, idx uint64, n int, delete bool) agentpb.Event { + t.Helper() + uuid := strings.ReplaceAll("11111111-????-????-????-????????????", "?", + strconv.Itoa(n)) + op := agentpb.ACLOp_Update + if delete { + op = agentpb.ACLOp_Delete + } + return agentpb.Event{ + Topic: agentpb.Topic_ACLTokens, + Index: idx, + Payload: &agentpb.Event_ACLToken{ + ACLToken: &agentpb.ACLTokenUpdate{ + Op: op, + Token: &agentpb.ACLTokenIdentifier{ + AccessorID: uuid, + SecretID: uuid, + }, + }, + }, + } +} + +func testACLPolicyEvent(t *testing.T, idx uint64, n int, delete bool) agentpb.Event { + t.Helper() + uuid := strings.ReplaceAll("22222222-????-????-????-????????????", "?", + strconv.Itoa(n)) + op := agentpb.ACLOp_Update + if delete { + op = agentpb.ACLOp_Delete + } + return agentpb.Event{ + Topic: agentpb.Topic_ACLPolicies, + Index: idx, + Payload: &agentpb.Event_ACLPolicy{ + ACLPolicy: &agentpb.ACLPolicyUpdate{ + Op: op, + PolicyID: uuid, + }, + }, + } +} + +func testACLRoleEvent(t *testing.T, idx uint64, n int, delete bool) agentpb.Event { + t.Helper() + uuid := strings.ReplaceAll("33333333-????-????-????-????????????", "?", + strconv.Itoa(n)) + op := agentpb.ACLOp_Update + if delete { + op = agentpb.ACLOp_Delete + } + return agentpb.Event{ + Topic: agentpb.Topic_ACLRoles, + Index: idx, + Payload: &agentpb.Event_ACLRole{ + ACLRole: &agentpb.ACLRoleUpdate{ + Op: op, + RoleID: uuid, + }, + }, + } +} + +func testToken(t *testing.T, n int) *structs.ACLToken { + uuid := strings.ReplaceAll("11111111-????-????-????-????????????", "?", + strconv.Itoa(n)) + return &structs.ACLToken{ + AccessorID: uuid, + SecretID: uuid, + } +} + +func testPolicy(t *testing.T, n int) *structs.ACLPolicy { + numStr := strconv.Itoa(n) + uuid := strings.ReplaceAll("22222222-????-????-????-????????????", "?", numStr) + return &structs.ACLPolicy{ + ID: uuid, + Name: "test_policy_" + numStr, + Rules: `operator = "read"`, + } +} + +func testRole(t *testing.T, n, p int) *structs.ACLRole { + numStr := strconv.Itoa(n) + uuid := strings.ReplaceAll("33333333-????-????-????-????????????", "?", numStr) + policy := testPolicy(t, p) + return &structs.ACLRole{ + ID: uuid, + Name: "test_role_" + numStr, + Policies: []structs.ACLRolePolicyLink{{ + ID: policy.ID, + Name: policy.Name, + }}, + } +} + +func TestACLEventsFromChanges(t *testing.T) { + cases := []struct { + Name string + Setup func(s *Store, tx *txn) error + Mutate func(s *Store, tx *txn) error + WantEvents []agentpb.Event + WantErr bool + }{ + { + Name: "token create", + Mutate: func(s *Store, tx *txn) error { + if err := s.aclTokenSetTxn(tx, tx.Index, testToken(t, 1), false, false, false, false); err != nil { + return err + } + return nil + }, + WantEvents: []agentpb.Event{ + testACLTokenEvent(t, 100, 1, false), + }, + WantErr: false, + }, + { + Name: "token update", + Setup: func(s *Store, tx *txn) error { + if err := s.aclTokenSetTxn(tx, tx.Index, testToken(t, 1), false, false, false, false); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Add a policy to the token (never mind it doesn't exist for now) we + // allow it in the set command below. + token := testToken(t, 1) + token.Policies = []structs.ACLTokenPolicyLink{{ID: "33333333-1111-1111-1111-111111111111"}} + if err := s.aclTokenSetTxn(tx, tx.Index, token, false, true, false, false); err != nil { + return err + } + return nil + }, + WantEvents: []agentpb.Event{ + // Should see an event from the update + testACLTokenEvent(t, 100, 1, false), + }, + WantErr: false, + }, + { + Name: "token delete", + Setup: func(s *Store, tx *txn) error { + if err := s.aclTokenSetTxn(tx, tx.Index, testToken(t, 1), false, false, false, false); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Delete it + token := testToken(t, 1) + if err := s.aclTokenDeleteTxn(tx, tx.Index, token.AccessorID, "id", nil); err != nil { + return err + } + return nil + }, + WantEvents: []agentpb.Event{ + // Should see a delete event + testACLTokenEvent(t, 100, 1, true), + }, + WantErr: false, + }, + { + Name: "policy create", + Mutate: func(s *Store, tx *txn) error { + if err := s.aclPolicySetTxn(tx, tx.Index, testPolicy(t, 1)); err != nil { + return err + } + return nil + }, + WantEvents: []agentpb.Event{ + testACLPolicyEvent(t, 100, 1, false), + }, + WantErr: false, + }, + { + Name: "policy update", + Setup: func(s *Store, tx *txn) error { + if err := s.aclPolicySetTxn(tx, tx.Index, testPolicy(t, 1)); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + policy := testPolicy(t, 1) + policy.Rules = `operator = "write"` + if err := s.aclPolicySetTxn(tx, tx.Index, policy); err != nil { + return err + } + return nil + }, + WantEvents: []agentpb.Event{ + // Should see an event from the update + testACLPolicyEvent(t, 100, 1, false), + }, + WantErr: false, + }, + { + Name: "policy delete", + Setup: func(s *Store, tx *txn) error { + if err := s.aclPolicySetTxn(tx, tx.Index, testPolicy(t, 1)); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Delete it + policy := testPolicy(t, 1) + if err := s.aclPolicyDeleteTxn(tx, tx.Index, policy.ID, s.aclPolicyGetByID, nil); err != nil { + return err + } + return nil + }, + WantEvents: []agentpb.Event{ + // Should see a delete event + testACLPolicyEvent(t, 100, 1, true), + }, + WantErr: false, + }, + { + Name: "role create", + Mutate: func(s *Store, tx *txn) error { + if err := s.aclRoleSetTxn(tx, tx.Index, testRole(t, 1, 1), true); err != nil { + return err + } + return nil + }, + WantEvents: []agentpb.Event{ + testACLRoleEvent(t, 100, 1, false), + }, + WantErr: false, + }, + { + Name: "role update", + Setup: func(s *Store, tx *txn) error { + if err := s.aclRoleSetTxn(tx, tx.Index, testRole(t, 1, 1), true); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + role := testRole(t, 1, 1) + policy2 := testPolicy(t, 2) + role.Policies = append(role.Policies, structs.ACLRolePolicyLink{ + ID: policy2.ID, + Name: policy2.Name, + }) + if err := s.aclRoleSetTxn(tx, tx.Index, role, true); err != nil { + return err + } + return nil + }, + WantEvents: []agentpb.Event{ + // Should see an event from the update + testACLRoleEvent(t, 100, 1, false), + }, + WantErr: false, + }, + { + Name: "role delete", + Setup: func(s *Store, tx *txn) error { + if err := s.aclRoleSetTxn(tx, tx.Index, testRole(t, 1, 1), true); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Delete it + role := testRole(t, 1, 1) + if err := s.aclRoleDeleteTxn(tx, tx.Index, role.ID, s.aclRoleGetByID, nil); err != nil { + return err + } + return nil + }, + WantEvents: []agentpb.Event{ + // Should see a delete event + testACLRoleEvent(t, 100, 1, true), + }, + WantErr: false, + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.Name, func(t *testing.T) { + s := testStateStore(t) + + if tc.Setup != nil { + // Bypass the publish mechanism for this test or we get into odd + // recursive stuff... + setupTx := s.db.WriteTxn(10) + require.NoError(t, tc.Setup(s, setupTx)) + // Commit the underlying transaction without using wrapped Commit so we + // avoid the whole event publishing system for setup here. It _should_ + // work but it makes debugging test hard as it will call the function + // under test for the setup data... + setupTx.Txn.Commit() + } + + tx := s.db.WriteTxn(100) + require.NoError(t, tc.Mutate(s, tx)) + + // Note we call the func under test directly rather than publishChanges so + // we can test this in isolation. + got, err := s.ACLEventsFromChanges(tx, tx.Changes()) + if tc.WantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + + // Make sure we have the right events, only taking ordering into account + // where it matters to account for non-determinism. + requireEventsInCorrectPartialOrder(t, tc.WantEvents, got, func(e agentpb.Event) string { + // We only care that events affecting the same actual token are ordered + // with respect ot each other so use it's ID as the key. + switch v := e.Payload.(type) { + case *agentpb.Event_ACLToken: + return "token:" + v.ACLToken.Token.AccessorID + case *agentpb.Event_ACLPolicy: + return "policy:" + v.ACLPolicy.PolicyID + case *agentpb.Event_ACLRole: + return "role:" + v.ACLRole.RoleID + } + return "" + }) + }) + } +} diff --git a/agent/consul/state/stream_publisher.go b/agent/consul/state/stream_publisher.go new file mode 100644 index 0000000000..7b95208296 --- /dev/null +++ b/agent/consul/state/stream_publisher.go @@ -0,0 +1,388 @@ +package state + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/hashicorp/go-memdb" + "golang.org/x/crypto/blake2b" + + "github.com/hashicorp/consul/agent/agentpb" + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/structs" +) + +type EventPublisher struct { + store *Store + + // topicBufferSize controls how many trailing events we keep in memory for + // each topic to avoid needing to snapshot again for re-connecting clients + // that may have missed some events. It may be zero for no buffering (the most + // recent event is always kept though). TODO + topicBufferSize int + + // snapCacheTTL controls how long we keep snapshots in our cache before + // allowing them to be garbage collected and a new one made for subsequent + // requests for that topic and key. In general this should be pretty short to + // keep memory overhead of duplicated event data low - snapshots are typically + // not that expensive, but having a cache for a few seconds can help + // de-duplicate building the same snapshot over and over again when a + // thundering herd of watchers all subscribe to the same topic within a few + // seconds. TODO + snapCacheTTL time.Duration + + // This lock protects the topicBuffers, snapCache and subsByToken maps. + lock sync.RWMutex + + // topicBuffers stores the head of the linked-list buffer to publish events to + // for a topic. + topicBuffers map[agentpb.Topic]*stream.EventBuffer + + // snapCache stores the head of any snapshot buffers still in cache if caching + // is enabled. + snapCache map[agentpb.Topic]map[string]*stream.EventSnapshot + + // snapFns is the set of snapshot functions that were registered bound to the + // state store. + snapFns map[agentpb.Topic]stream.SnapFn + + // subsByToken stores a list of Subscription objects outstanding indexed by a + // hash of the ACL token they used to subscribe so we can reload them if their + // ACL permissions change. + subsByToken map[string]map[*agentpb.SubscribeRequest]*stream.Subscription + + // commitCh decouples the Commit call in the FSM hot path from distributing + // the resulting events. + commitCh chan commitUpdate +} + +type commitUpdate struct { + tx *txnWrapper + events []agentpb.Event +} + +func NewEventPublisher(store *Store, topicBufferSize int, snapCacheTTL time.Duration) *EventPublisher { + e := &EventPublisher{ + store: store, + topicBufferSize: topicBufferSize, + snapCacheTTL: snapCacheTTL, + topicBuffers: make(map[agentpb.Topic]*stream.EventBuffer), + snapCache: make(map[agentpb.Topic]map[string]*stream.EventSnapshot), + snapFns: make(map[agentpb.Topic]stream.SnapFn), + subsByToken: make(map[string]map[*agentpb.SubscribeRequest]*stream.Subscription), + commitCh: make(chan commitUpdate, 64), + } + + // create a local handler table + // TODO: document why + for topic, handlers := range topicRegistry { + fnCopy := handlers.Snapshot + e.snapFns[topic] = func(req *agentpb.SubscribeRequest, buf *stream.EventBuffer) (uint64, error) { + return fnCopy(e.store, req, buf) + } + } + + go e.handleUpdates() + + return e +} + +func (e *EventPublisher) publishChanges(tx *txn, changes memdb.Changes) error { + var events []agentpb.Event + for topic, th := range topicRegistry { + if th.ProcessChanges != nil { + es, err := th.ProcessChanges(e.store, tx, changes) + if err != nil { + return fmt.Errorf("failed generating events for topic %q: %s", topic, err) + } + events = append(events, es...) + } + } + e.commitCh <- commitUpdate{ + // TODO: document why it must be created here, and not in the new thread + // + // Create a new transaction since it's going to be used from a different + // thread. Transactions aren't thread safe but it's OK to create it here + // since we won't try to use it in this thread and pass it straight to the + // handler which will own it exclusively. + tx: e.store.db.Txn(false), + events: events, + } + return nil +} + +func (e *EventPublisher) handleUpdates() { + for { + update := <-e.commitCh + e.sendEvents(update) + } +} + +// sendEvents sends the given events to any applicable topic listeners, as well +// as any ACL update events to cause affected listeners to reset their stream. +func (e *EventPublisher) sendEvents(update commitUpdate) { + e.lock.Lock() + defer e.lock.Unlock() + + // Always abort the transaction. This is not strictly necessary with memDB + // because once we drop the reference to the Txn object, the radix nodes will + // be GCed anyway but it's hygienic incase memDB ever has a different + // implementation. + defer update.tx.Abort() + + eventsByTopic := make(map[agentpb.Topic][]agentpb.Event) + + for _, event := range update.events { + // If the event is an ACL update, treat it as a special case. Currently + // ACL update events are only used internally to recognize when a subscriber + // should reload its subscription. + if event.Topic == agentpb.Topic_ACLTokens || + event.Topic == agentpb.Topic_ACLPolicies || + event.Topic == agentpb.Topic_ACLRoles { + + if err := e.handleACLUpdate(update.tx, event); err != nil { + // This seems pretty drastic? What would be better. It's not super safe + // to continue since we might have missed some ACL update and so leak + // data to unauthorized clients but crashing whole server also seems + // bad. I wonder if we could send a "reset" to all subscribers instead + // and effectively re-start all subscriptions to be on the safe side + // without just crashing? + // TODO(banks): reset all instead of panic? + panic(err) + } + + continue + } + + // Split events by topic to deliver. + eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event) + } + + // Deliver events + for topic, events := range eventsByTopic { + buf, ok := e.topicBuffers[topic] + if !ok { + buf = stream.NewEventBuffer() + e.topicBuffers[topic] = buf + } + buf.Append(events) + } +} + +// handleACLUpdate handles an ACL token/policy/role update. This method assumes +// the lock is held. +func (e *EventPublisher) handleACLUpdate(tx *txn, event agentpb.Event) error { + switch event.Topic { + case agentpb.Topic_ACLTokens: + token := event.GetACLToken() + subs := e.subsByToken[secretHash(token.Token.SecretID)] + for _, sub := range subs { + sub.CloseReload() + } + case agentpb.Topic_ACLPolicies: + policy := event.GetACLPolicy() + // TODO(streaming) figure out how to thread method/ent meta here for + // namespace support in Ent. Probably need wildcard here? + tokens, err := e.store.aclTokenListByPolicy(tx, policy.PolicyID, nil) + if err != nil { + return err + } + + // Loop through the tokens used by the policy. + for token := tokens.Next(); token != nil; token = tokens.Next() { + token := token.(*structs.ACLToken) + if subs, ok := e.subsByToken[secretHash(token.SecretID)]; ok { + for _, sub := range subs { + sub.CloseReload() + } + } + } + + // Find any roles using this policy so tokens with those roles can be reloaded. + roles, err := e.store.aclRoleListByPolicy(tx, policy.PolicyID, nil) + if err != nil { + return err + } + for role := roles.Next(); role != nil; role = roles.Next() { + role := role.(*structs.ACLRole) + + // TODO(streaming) figure out how to thread method/ent meta here for + // namespace support in Ent. + tokens, err := e.store.aclTokenListByRole(tx, role.ID, nil) + if err != nil { + return err + } + for token := tokens.Next(); token != nil; token = tokens.Next() { + token := token.(*structs.ACLToken) + if subs, ok := e.subsByToken[secretHash(token.SecretID)]; ok { + for _, sub := range subs { + sub.CloseReload() + } + } + } + } + + case agentpb.Topic_ACLRoles: + role := event.GetACLRole() + // TODO(streaming) figure out how to thread method/ent meta here for + // namespace support in Ent. + tokens, err := e.store.aclTokenListByRole(tx, role.RoleID, nil) + if err != nil { + return err + } + for token := tokens.Next(); token != nil; token = tokens.Next() { + token := token.(*structs.ACLToken) + if subs, ok := e.subsByToken[secretHash(token.SecretID)]; ok { + for _, sub := range subs { + sub.CloseReload() + } + } + } + } + + return nil +} + +// secretHash returns a 256-bit Blake2 hash of the given string. +func secretHash(token string) string { + hash, err := blake2b.New256(nil) + if err != nil { + panic(err) + } + hash.Write([]byte(token)) + return string(hash.Sum(nil)) +} + +// Subscribe returns a new stream.Subscription for the given request. A +// subscription will stream an initial snapshot of events matching the request +// if required and then block until new events that modify the request occur, or +// the context is cancelled. Subscriptions may be forced to reset if the server +// decides it can no longer maintain correct operation for example if ACL +// policies changed or the state store was restored. +// +// When the called is finished with the subscription for any reason, it must +// call Unsubscribe to free ACL tracking resources. +func (e *EventPublisher) Subscribe(ctx context.Context, + req *agentpb.SubscribeRequest) (*stream.Subscription, error) { + // Ensure we know how to make a snapshot for this topic + _, ok := topicRegistry[req.Topic] + if !ok { + return nil, fmt.Errorf("unknown topic %s", req.Topic) + } + + e.lock.Lock() + defer e.lock.Unlock() + + // Ensure there is a topic buffer for that topic so we start capturing any + // future published events. + buf, ok := e.topicBuffers[req.Topic] + if !ok { + buf = stream.NewEventBuffer() + e.topicBuffers[req.Topic] = buf + } + + // See if we need a snapshot + topicHead := buf.Head() + var sub *stream.Subscription + if req.Index > 0 && len(topicHead.Events) > 0 && topicHead.Events[0].Index == req.Index { + // No need for a snapshot just send the "end snapshot" message to signal to + // client it's cache is still good. (note that this can be distinguished + // from a legitimate empty snapshot due to the index matching the one the + // client sent), then follow along from here in the topic. + e := agentpb.Event{ + Index: req.Index, + Topic: req.Topic, + Key: req.Key, + Payload: &agentpb.Event_ResumeStream{ResumeStream: true}, + } + // Make a new buffer to send to the client containing the resume. + buf := stream.NewEventBuffer() + + // Store the head of that buffer before we append to it to give as the + // starting point for the subscription. + subHead := buf.Head() + + buf.Append([]agentpb.Event{e}) + + // Now splice the rest of the topic buffer on so the subscription will + // continue to see future updates in the topic buffer. + follow, err := topicHead.FollowAfter() + if err != nil { + return nil, err + } + buf.AppendBuffer(follow) + + sub = stream.NewSubscription(ctx, req, subHead) + } else { + snap, err := e.getSnapshotLocked(req, topicHead) + if err != nil { + return nil, err + } + sub = stream.NewSubscription(ctx, req, snap.Snap) + } + + // Add the subscription to the ACL token map. + tokenHash := secretHash(req.Token) + subsByToken, ok := e.subsByToken[tokenHash] + if !ok { + subsByToken = make(map[*agentpb.SubscribeRequest]*stream.Subscription) + e.subsByToken[tokenHash] = subsByToken + } + subsByToken[req] = sub + + return sub, nil +} + +// Unsubscribe must be called when a client is no longer interested in a +// subscription to free resources monitoring changes in it's ACL token. The same +// request object passed to Subscribe must be used. +func (e *EventPublisher) Unsubscribe(req *agentpb.SubscribeRequest) { + e.lock.Lock() + defer e.lock.Unlock() + + tokenHash := secretHash(req.Token) + subsByToken, ok := e.subsByToken[tokenHash] + if !ok { + return + } + delete(subsByToken, req) + if len(subsByToken) == 0 { + delete(e.subsByToken, tokenHash) + } +} + +func (e *EventPublisher) getSnapshotLocked(req *agentpb.SubscribeRequest, topicHead *stream.BufferItem) (*stream.EventSnapshot, error) { + // See if there is a cached snapshot + topicSnaps, ok := e.snapCache[req.Topic] + if !ok { + topicSnaps = make(map[string]*stream.EventSnapshot) + e.snapCache[req.Topic] = topicSnaps + } + + snap, ok := topicSnaps[req.Key] + if ok && snap.Err() == nil { + return snap, nil + } + + // No snap or errored snap in cache, create a new one + snapFn, ok := e.snapFns[req.Topic] + if !ok { + return nil, fmt.Errorf("unknown topic %s", req.Topic) + } + + snap = stream.NewEventSnapshot(req, topicHead, snapFn) + if e.snapCacheTTL > 0 { + topicSnaps[req.Key] = snap + + // Trigger a clearout after TTL + time.AfterFunc(e.snapCacheTTL, func() { + e.lock.Lock() + defer e.lock.Unlock() + delete(topicSnaps, req.Key) + }) + } + + return snap, nil +} diff --git a/agent/consul/state/stream_publisher_test.go b/agent/consul/state/stream_publisher_test.go new file mode 100644 index 0000000000..5b85f0d031 --- /dev/null +++ b/agent/consul/state/stream_publisher_test.go @@ -0,0 +1,454 @@ +package state + +import ( + "context" + "testing" + "time" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/agentpb" + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/structs" + "github.com/stretchr/testify/require" +) + +type nextResult struct { + Events []agentpb.Event + Err error +} + +func testRunSub(sub *stream.Subscription) <-chan nextResult { + eventCh := make(chan nextResult, 1) + go func() { + for { + es, err := sub.Next() + eventCh <- nextResult{ + Events: es, + Err: err, + } + if err != nil { + return + } + } + }() + return eventCh +} + +func assertNoEvent(t *testing.T, eventCh <-chan nextResult) { + t.Helper() + select { + case next := <-eventCh: + require.NoError(t, next.Err) + require.Len(t, next.Events, 1) + t.Fatalf("got unwanted event: %#v", next.Events[0].GetPayload()) + case <-time.After(100 * time.Millisecond): + } +} + +func assertEvent(t *testing.T, eventCh <-chan nextResult) *agentpb.Event { + t.Helper() + select { + case next := <-eventCh: + require.NoError(t, next.Err) + require.Len(t, next.Events, 1) + return &next.Events[0] + case <-time.After(100 * time.Millisecond): + t.Fatalf("no event after 100ms") + } + return nil +} + +func assertErr(t *testing.T, eventCh <-chan nextResult) error { + t.Helper() + select { + case next := <-eventCh: + require.Error(t, next.Err) + return next.Err + case <-time.After(100 * time.Millisecond): + t.Fatalf("no err after 100ms") + } + return nil +} + +// assertReset checks that a ResetStream event is send to the subscription +// within 100ms. If allowEOS is true it will ignore any intermediate events that +// come before the reset provided they are EndOfSnapshot events because in many +// cases it's non-deterministic whether the snapshot will complete before the +// acl reset is handled. +func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) { + t.Helper() + timeoutCh := time.After(100 * time.Millisecond) + for { + select { + case next := <-eventCh: + if allowEOS { + if next.Err == nil && len(next.Events) == 1 && next.Events[0].GetEndOfSnapshot() { + continue + } + } + require.Error(t, next.Err) + require.Equal(t, stream.ErrSubscriptionReload, next.Err) + return + case <-timeoutCh: + t.Fatalf("no err after 100ms") + } + } +} + +func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken { + // Token to use during this test. + token := &structs.ACLToken{ + AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4", + SecretID: "4268ce0d-d7ae-4718-8613-42eba9036020", + Description: "something", + Policies: []structs.ACLTokenPolicyLink{ + structs.ACLTokenPolicyLink{ + ID: testPolicyID_A, + }, + }, + Roles: []structs.ACLTokenRoleLink{ + structs.ACLTokenRoleLink{ + ID: testRoleID_B, + }, + }, + } + token.SetHash(false) + + // If we subscribe immediately after we create a token we race with the + // publisher that is publishing the ACL token event for the token we just + // created. That means that the subscription we create right after will often + // be immediately reset. The most reliable way to avoid that without just + // sleeping for some arbitrary time is to pre-subscribe using the token before + // it actually exists (which works because the publisher doesn't check tokens + // it assumes something lower down did that) and then wait for it to be reset + // so we know the initial token write event has been sent out before + // continuing... + subscription := &agentpb.SubscribeRequest{ + Topic: agentpb.Topic_ServiceHealth, + Key: "nope", + Token: token.SecretID, + } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + sub, err := s.publisher.Subscribe(ctx, subscription) + require.NoError(t, err) + + eventCh := testRunSub(sub) + + // Create the ACL token to be used in the subscription. + require.NoError(t, s.ACLTokenSet(2, token.Clone(), false)) + + // Wait for the pre-subscription to be reset + assertReset(t, eventCh, true) + + return token +} + +func TestPublisher_BasicPublish(t *testing.T) { + t.Parallel() + require := require.New(t) + s := testStateStore(t) + + // Register an initial instance + reg := structs.TestRegisterRequest(t) + reg.Service.ID = "web1" + require.NoError(s.EnsureRegistration(1, reg)) + + // Register the subscription. + subscription := &agentpb.SubscribeRequest{ + Topic: agentpb.Topic_ServiceHealth, + Key: reg.Service.Service, + } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + sub, err := s.publisher.Subscribe(ctx, subscription) + require.NoError(err) + + eventCh := testRunSub(sub) + + // Stream should get the instance and then EndOfSnapshot + e := assertEvent(t, eventCh) + sh := e.GetServiceHealth() + require.NotNil(sh, "expected service health event, got %v", e) + e = assertEvent(t, eventCh) + require.True(e.GetEndOfSnapshot()) + + // Now subscriber should block waiting for updates + assertNoEvent(t, eventCh) + + // Add a new instance of service on a different node + reg2 := reg + reg2.Node = "node2" + require.NoError(s.EnsureRegistration(1, reg)) + + // Subscriber should see registration + e = assertEvent(t, eventCh) + sh = e.GetServiceHealth() + require.NotNil(sh, "expected service health event, got %v", e) +} + +func TestPublisher_ACLTokenUpdate(t *testing.T) { + t.Parallel() + require := require.New(t) + s := testACLTokensStateStore(t) + + // Setup token and wait for good state + token := createTokenAndWaitForACLEventPublish(t, s) + + // Register the subscription. + subscription := &agentpb.SubscribeRequest{ + Topic: agentpb.Topic_ServiceHealth, + Key: "nope", + Token: token.SecretID, + } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + sub, err := s.publisher.Subscribe(ctx, subscription) + require.NoError(err) + + eventCh := testRunSub(sub) + + // Stream should get EndOfSnapshot + e := assertEvent(t, eventCh) + require.True(e.GetEndOfSnapshot()) + + // Update an unrelated token. + token2 := &structs.ACLToken{ + AccessorID: "a7bbf480-8440-4f55-acfc-6fdca25cb13e", + SecretID: "72e81982-7a0f-491f-a60e-c9c802ac1402", + } + token2.SetHash(false) + require.NoError(s.ACLTokenSet(3, token2.Clone(), false)) + + // Ensure there's no reset event. + assertNoEvent(t, eventCh) + + // Now update the token used in the subscriber. + token3 := &structs.ACLToken{ + AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4", + SecretID: "4268ce0d-d7ae-4718-8613-42eba9036020", + Description: "something else", + } + token3.SetHash(false) + require.NoError(s.ACLTokenSet(4, token3.Clone(), false)) + + // Ensure the reset event was sent. + err = assertErr(t, eventCh) + require.Equal(stream.ErrSubscriptionReload, err) + + // Register another subscription. + subscription2 := &agentpb.SubscribeRequest{ + Topic: agentpb.Topic_ServiceHealth, + Key: "nope", + Token: token.SecretID, + } + sub2, err := s.publisher.Subscribe(ctx, subscription2) + require.NoError(err) + + eventCh2 := testRunSub(sub2) + + // Expect initial EoS + e = assertEvent(t, eventCh2) + require.True(e.GetEndOfSnapshot()) + + // Delete the unrelated token. + require.NoError(s.ACLTokenDeleteByAccessor(5, token2.AccessorID, nil)) + + // Ensure there's no reset event. + assertNoEvent(t, eventCh2) + + // Delete the token used by the subscriber. + require.NoError(s.ACLTokenDeleteByAccessor(6, token.AccessorID, nil)) + + // Ensure the reset event was sent. + err = assertErr(t, eventCh2) + require.Equal(stream.ErrSubscriptionReload, err) +} + +func TestPublisher_ACLPolicyUpdate(t *testing.T) { + t.Parallel() + require := require.New(t) + s := testACLTokensStateStore(t) + + // Create token and wait for good state + token := createTokenAndWaitForACLEventPublish(t, s) + + // Register the subscription. + subscription := &agentpb.SubscribeRequest{ + Topic: agentpb.Topic_ServiceHealth, + Key: "nope", + Token: token.SecretID, + } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + sub, err := s.publisher.Subscribe(ctx, subscription) + require.NoError(err) + + eventCh := testRunSub(sub) + + // Ignore the end of snapshot event + e := assertEvent(t, eventCh) + require.True(e.GetEndOfSnapshot(), "event should be a EoS got %v", e) + + // Update an unrelated policy. + policy2 := structs.ACLPolicy{ + ID: testPolicyID_C, + Name: "foo-read", + Rules: `node "foo" { policy = "read" }`, + Syntax: acl.SyntaxCurrent, + Datacenters: []string{"dc1"}, + } + policy2.SetHash(false) + require.NoError(s.ACLPolicySet(3, &policy2)) + + // Ensure there's no reset event. + assertNoEvent(t, eventCh) + + // Now update the policy used in the subscriber. + policy3 := structs.ACLPolicy{ + ID: testPolicyID_A, + Name: "node-read", + Rules: `node_prefix "" { policy = "write" }`, + Syntax: acl.SyntaxCurrent, + Datacenters: []string{"dc1"}, + } + policy3.SetHash(false) + require.NoError(s.ACLPolicySet(4, &policy3)) + + // Ensure the reset event was sent. + assertReset(t, eventCh, true) + + // Register another subscription. + subscription2 := &agentpb.SubscribeRequest{ + Topic: agentpb.Topic_ServiceHealth, + Key: "nope", + Token: token.SecretID, + } + sub, err = s.publisher.Subscribe(ctx, subscription2) + require.NoError(err) + + eventCh = testRunSub(sub) + + // Ignore the end of snapshot event + e = assertEvent(t, eventCh) + require.True(e.GetEndOfSnapshot(), "event should be a EoS got %v", e) + + // Delete the unrelated policy. + require.NoError(s.ACLPolicyDeleteByID(5, testPolicyID_C, nil)) + + // Ensure there's no reload event. + assertNoEvent(t, eventCh) + + // Delete the policy used by the subscriber. + require.NoError(s.ACLPolicyDeleteByID(6, testPolicyID_A, nil)) + + // Ensure the reload event was sent. + err = assertErr(t, eventCh) + require.Equal(stream.ErrSubscriptionReload, err) + + // Register another subscription. + subscription3 := &agentpb.SubscribeRequest{ + Topic: agentpb.Topic_ServiceHealth, + Key: "nope", + Token: token.SecretID, + } + sub, err = s.publisher.Subscribe(ctx, subscription3) + require.NoError(err) + + eventCh = testRunSub(sub) + + // Ignore the end of snapshot event + e = assertEvent(t, eventCh) + require.True(e.GetEndOfSnapshot(), "event should be a EoS got %v", e) + + // Now update the policy used in role B, but not directly in the token. + policy4 := structs.ACLPolicy{ + ID: testPolicyID_B, + Name: "node-read", + Rules: `node_prefix "foo" { policy = "read" }`, + Syntax: acl.SyntaxCurrent, + Datacenters: []string{"dc1"}, + } + policy4.SetHash(false) + require.NoError(s.ACLPolicySet(7, &policy4)) + + // Ensure the reset event was sent. + assertReset(t, eventCh, true) +} + +func TestPublisher_ACLRoleUpdate(t *testing.T) { + t.Parallel() + require := require.New(t) + s := testACLTokensStateStore(t) + + // Create token and wait for good state + token := createTokenAndWaitForACLEventPublish(t, s) + + // Register the subscription. + subscription := &agentpb.SubscribeRequest{ + Topic: agentpb.Topic_ServiceHealth, + Key: "nope", + Token: token.SecretID, + } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + sub, err := s.publisher.Subscribe(ctx, subscription) + require.NoError(err) + + eventCh := testRunSub(sub) + + // Stream should get EndOfSnapshot + e := assertEvent(t, eventCh) + require.True(e.GetEndOfSnapshot()) + + // Update an unrelated role (the token has role testRoleID_B). + role := structs.ACLRole{ + ID: testRoleID_A, + Name: "unrelated-role", + Description: "test", + } + role.SetHash(false) + require.NoError(s.ACLRoleSet(3, &role)) + + // Ensure there's no reload event. + assertNoEvent(t, eventCh) + + // Now update the role used by the token in the subscriber. + role2 := structs.ACLRole{ + ID: testRoleID_B, + Name: "my-new-role", + Description: "changed", + } + role2.SetHash(false) + require.NoError(s.ACLRoleSet(4, &role2)) + + // Ensure the reload event was sent. + assertReset(t, eventCh, false) + + // Register another subscription. + subscription2 := &agentpb.SubscribeRequest{ + Topic: agentpb.Topic_ServiceHealth, + Key: "nope", + Token: token.SecretID, + } + sub, err = s.publisher.Subscribe(ctx, subscription2) + require.NoError(err) + + eventCh = testRunSub(sub) + + // Ignore the end of snapshot event + e = assertEvent(t, eventCh) + require.True(e.GetEndOfSnapshot(), "event should be a EoS got %v", e) + + // Delete the unrelated policy. + require.NoError(s.ACLRoleDeleteByID(5, testRoleID_A, nil)) + + // Ensure there's no reload event. + assertNoEvent(t, eventCh) + + // Delete the policy used by the subscriber. + require.NoError(s.ACLRoleDeleteByID(6, testRoleID_B, nil)) + + // Ensure the reload event was sent. + assertReset(t, eventCh, false) +} diff --git a/agent/consul/state/stream_topics.go b/agent/consul/state/stream_topics.go new file mode 100644 index 0000000000..a4834da3e1 --- /dev/null +++ b/agent/consul/state/stream_topics.go @@ -0,0 +1,49 @@ +package state + +import ( + "github.com/hashicorp/consul/agent/agentpb" + "github.com/hashicorp/consul/agent/consul/stream" + memdb "github.com/hashicorp/go-memdb" +) + +// unboundSnapFn is a stream.SnapFn with state store as the first argument. This +// is bound to a concrete state store instance in the EventPublisher on startup. +type unboundSnapFn func(*Store, *agentpb.SubscribeRequest, *stream.EventBuffer) (uint64, error) +type unboundProcessChangesFn func(*Store, *txnWrapper, memdb.Changes) ([]agentpb.Event, error) + +// topicHandlers describes the methods needed to process a streaming +// subscription for a given topic. +type topicHandlers struct { + Snapshot unboundSnapFn + ProcessChanges unboundProcessChangesFn +} + +// topicRegistry is a map of topic handlers. It must only be written to during +// init(). +var topicRegistry map[agentpb.Topic]topicHandlers + +func init() { + topicRegistry = map[agentpb.Topic]topicHandlers{ + agentpb.Topic_ServiceHealth: topicHandlers{ + Snapshot: (*Store).ServiceHealthSnapshot, + ProcessChanges: (*Store).ServiceHealthEventsFromChanges, + }, + agentpb.Topic_ServiceHealthConnect: topicHandlers{ + Snapshot: (*Store).ServiceHealthConnectSnapshot, + // Note there is no ProcessChanges since Connect events are published by + // the same event publisher as regular health events to avoid duplicating + // lots of filtering on every commit. + }, + // For now we don't actually support subscribing to ACL* topics externally + // so these have no Snapshot methods yet. We do need to have a + // ProcessChanges func to publish the partial events on ACL changes though + // so that we can invalidate other subscriptions if their effective ACL + // permissions change. + agentpb.Topic_ACLTokens: topicHandlers{ + ProcessChanges: (*Store).ACLEventsFromChanges, + }, + // Note no ACLPolicies/ACLRoles defined yet because we publish all events + // from one handler to save on iterating/filtering and duplicating code and + // there are no snapshots for these yet per comment above. + } +} diff --git a/agent/consul/stream/event_buffer.go b/agent/consul/stream/event_buffer.go new file mode 100644 index 0000000000..aa6a2c89c8 --- /dev/null +++ b/agent/consul/stream/event_buffer.go @@ -0,0 +1,248 @@ +package stream + +import ( + "context" + "errors" + "sync/atomic" + + "github.com/hashicorp/consul/agent/agentpb" +) + +// EventBuffer is a single-writer, multiple-reader, unlimited length concurrent +// buffer of events that have been published on a topic. The buffer is +// effectively just the head of an atomically updated single-linked list. Atomic +// accesses are usually to be suspected as premature optimization but this +// specifc design has several important features that significantly simplify a +// lot of our PubSub machinery. +// +// The Buffer itself only ever tracks the most recent set of events published so +// if there are no consumers older events are automatically garbage collected. +// Notification of new events is done by closing a channel on the previous head +// alowing efficient broadcast to many watchers without having to run multile +// goroutines or deliver to O(N) separate channels. +// +// Because it's a linked list with atomically updated pointers, readers don't +// have to take a lock and can consume at their own pace. but we also don't have +// to have a fixed limit on the number of items which either means we don't have +// to trade off buffer length config to balance using lots of memory wastefully +// vs handling occasional slow readers. +// +// The buffer is used to deliver all messages broadcast toa topic for active +// subscribers to consume, but it is also an effective way to both deliver and +// optionally cache snapshots per topic and key. byt using an EventBuffer, +// snapshot functions don't have to read the whole snapshot into memory before +// delivery - they can stream from memdb. However simply by storing a pointer to +// the first event in the buffer, we can cache the buffered events for future +// watchers on the same topic. Finally, once we've delivered all the snapshot +// events to the buffer, we can append a next-element which is the first topic +// buffer element with a higher index and so consuers can just keep reading the +// same buffer. +// +// A huge benefit here is that caching snapshots becomes very simple - we don't +// have to do any additional book keeping to figure out when to truncate the +// topic buffer to make sure the snapshot is still usable or run into issues +// where the cached snapshot is no longer useful since the buffer will keep +// elements around only as long as either the cache or a subscriber need them. +// So we can use whatever simple timeout logic we like to decide how long to +// keep caches (or if we should keep them at all) and the buffers will +// automatically keep the events we need to make that work for exactly the +// optimal amount of time and no longer. +// +// A new buffer is constructed with a sentinel "empty" BufferItem that has a nil +// Events array. This enables subscribers to start watching for the next update +// immediately. +// +// The zero value EventBuffer is _not_ a usable type since it has not been +// initialized with an empty bufferItem so can't be used to wait for the first +// published event. Call NewEventBuffer to construct a new buffer. +// +// Calls to Append or AppendBuffer that mutate the head must be externally +// synchronized. This allows systems that already serialize writes to append +// without lock overhead (e.g. a snapshot goroutine appending thousands of +// events). +type EventBuffer struct { + head atomic.Value +} + +// NewEventBuffer creates an EventBuffer ready for use. +func NewEventBuffer() *EventBuffer { + b := &EventBuffer{} + b.head.Store(NewBufferItem()) + return b +} + +// Append a set of events from one raft operation to the buffer and notify +// watchers. Note that events must not have been previously made available to +// any other goroutine since we may mutate them to ensure ACL Rules are +// populated. After calling append, the caller must not make any further +// mutations to the events as they may have been exposed to subscribers in other +// goroutines. Append only supports a single concurrent caller and must be +// externally synchronized with other Append, AppendBuffer or AppendErr calls. +func (b *EventBuffer) Append(events []agentpb.Event) { + // Push events to the head + it := NewBufferItem() + it.Events = events + b.AppendBuffer(it) +} + +// AppendBuffer joins another buffer which may be the tail of a separate buffer +// for example a buffer that's had the events from a snapshot appended may +// finally by linked to the topic buffer for the subsequent events so +// subscribers can seamlessly consume the updates. Note that Events in item must +// already be fully populated with ACL rules and must not be mutated further as +// they may have already been published to subscribers. +// +// AppendBuffer only supports a single concurrent caller and must be externally +// synchronized with other Append, AppendBuffer or AppendErr calls. +func (b *EventBuffer) AppendBuffer(item *BufferItem) { + // First store it as the next node for the old head this ensures once it's + // visible to new searchers the linked list is already valid. Not sure it + // matters but this seems nicer. + oldHead := b.Head() + oldHead.link.next.Store(item) + b.head.Store(item) + + // Now it's added invalidate the oldHead to notify waiters + close(oldHead.link.ch) + // don't set chan to nil since that will race with readers accessing it. +} + +// AppendErr publishes an error result to the end of the buffer. This is +// considered terminal and will cause all subscribers to end their current +// streaming subscription and return the error. AppendErr only supports a +// single concurrent caller and must be externally synchronized with other +// Append, AppendBuffer or AppendErr calls. +func (b *EventBuffer) AppendErr(err error) { + b.AppendBuffer(&BufferItem{Err: err}) +} + +// Head returns the current head of the buffer. It will always exist but it may +// be a "sentinel" empty item with a nil Events slice to allow consumers to +// watch for the next update. Consumers should always check for empty Events and +// treat them as no-ops. Will panic if EventBuffer was not initialized correctly +// with EventBuffer. +func (b *EventBuffer) Head() *BufferItem { + return b.head.Load().(*BufferItem) +} + +// BufferItem represents a set of events published by a single raft operation. +// The first item returned by a newly constructed buffer will have nil Events +// and should be considered a "sentinel" value just useful for waiting on the +// next events via Next. +// +// To iterate to the next event, a Next method may be called which may block if +// there is no next element yet. +// +// Holding a pointer to the item keeps all the events published since in memory +// so it's important that subscribers don't hold pointers to buffer items after +// they have been delivered except where it's intentional to maintain a cache or +// trailing store of events for performance reasons. +// +// Subscribers must not mutate the BufferItem or the Events or Encoded payloads +// inside as these are shared between all readers. +type BufferItem struct { + // Events is the set of events published at one raft index. This may be nil as + // a sentinel value to allow watching for the first event in a buffer. Callers + // should check and skip nil Events at any point in the buffer. It will also + // be nil if the producer appends an Error event because they can't complete + // the request to populate the buffer. Err will be non-nil in this case. + Events []agentpb.Event + + // Err is non-nil if the producer can't complete their task and terminates the + // buffer. Subscribers should return the error to clients and cease attempting + // to read from the buffer. + Err error + + // link holds the next pointer and channel. This extra bit of indirection + // allows us to splice buffers together at arbitrary points without including + // events in one buffer just for the side-effect of watching for the next set. + // The link may not be mutated once the event is appended to a buffer. + link *bufferLink +} + +type bufferLink struct { + // next is an atomically updated pointer to the next event in the buffer. It + // is written exactly once by the single published and will always be set if + // ch is closed. + next atomic.Value + + // ch is closed when the next event is published. It should never be mutated + // (e.g. set to nil) as that is racey, but is closed once when the next event + // is published. the next pointer will have been set by the time this is + // closed. + ch chan struct{} +} + +// NewBufferItem returns a blank buffer item with a link and chan ready to have +// the fields set and be appended to a buffer. +func NewBufferItem() *BufferItem { + return &BufferItem{ + link: &bufferLink{ + ch: make(chan struct{}), + }, + } +} + +// Next return the next buffer item in the buffer. It may block until ctx is +// cancelled or until the next item is published. +func (i *BufferItem) Next(ctx context.Context) (*BufferItem, error) { + // See if there is already a next value, block if so. Note we don't rely on + // state change (chan nil) as that's not threadsafe but detecting close is. + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-i.link.ch: + } + + // If channel closed, there must be a next item to read + nextRaw := i.link.next.Load() + if nextRaw == nil { + // shouldn't be possible + return nil, errors.New("invalid next item") + } + next := nextRaw.(*BufferItem) + if next.Err != nil { + return nil, next.Err + } + if len(next.Events) == 0 { + // Skip this event + return next.Next(ctx) + } + return next, nil +} + +// NextNoBlock returns the next item in the buffer without blocking. If it +// reaches the most recent item it will return nil and no error. +func (i *BufferItem) NextNoBlock() (*BufferItem, error) { + nextRaw := i.link.next.Load() + if nextRaw == nil { + return nil, nil + } + next := nextRaw.(*BufferItem) + if next.Err != nil { + return nil, next.Err + } + if len(next.Events) == 0 { + // Skip this event + return next.NextNoBlock() + } + return next, nil +} + +// FollowAfter returns either the next item in the buffer if there is already +// one, or if not it returns an empty item (that will be ignored by subscribers) +// that has the same link as the current buffer so that it will be notified of +// future updates in the buffer without including the current item. +func (i *BufferItem) FollowAfter() (*BufferItem, error) { + next, err := i.NextNoBlock() + if err != nil { + return nil, err + } + if next == nil { + // Return an empty item that can be followed to the next item published. + item := &BufferItem{} + item.link = i.link + return item, nil + } + return next, nil +} diff --git a/agent/consul/stream/event_buffer_test.go b/agent/consul/stream/event_buffer_test.go new file mode 100644 index 0000000000..408ecdeab8 --- /dev/null +++ b/agent/consul/stream/event_buffer_test.go @@ -0,0 +1,89 @@ +package stream + +import ( + "context" + fmt "fmt" + "math/rand" + "testing" + time "time" + + "github.com/stretchr/testify/assert" + + "github.com/hashicorp/consul/agent/agentpb" +) + +func TestEventBufferFuzz(t *testing.T) { + // A property-based test to ensure that under heavy concurrent use trivial + // correctness properties are not violated (and that -race doesn't complain). + + nReaders := 1000 + nMessages := 1000 + + b := NewEventBuffer() + + // Start a write goroutine that will publish 10000 messages with sequential + // indexes and some jitter in timing (to allow clients to "catch up" and block + // waiting for updates). + go func() { + // z is a Zipfian distribution that gives us a number of milliseconds to + // sleep which are mostly low - near zero but occasionally spike up to near + // 100. + z := rand.NewZipf(rand.New(rand.NewSource(1)), 1.5, 1.5, 50) + + for i := 0; i < nMessages; i++ { + // Event content is arbitrary and not valid for our use of buffers in + // streaming - here we only care about the semantics of the buffer. + e := agentpb.Event{ + Index: uint64(i), // Indexes should be contiguous + Topic: agentpb.Topic_ServiceHealth, + Payload: &agentpb.Event_EndOfSnapshot{ + EndOfSnapshot: true, + }, + } + b.Append([]agentpb.Event{e}) + // Sleep sometimes for a while to let some subscribers catch up + wait := time.Duration(z.Uint64()) * time.Millisecond + time.Sleep(wait) + } + }() + + // Run n subscribers following and verifying + errCh := make(chan error, nReaders) + + // Load head here so all subscribers start from the same point or they might + // no run until several appends have already happened. + head := b.Head() + + for i := 0; i < nReaders; i++ { + go func(i int) { + expect := uint64(0) + item := head + var err error + for { + item, err = item.Next(context.Background()) + if err != nil { + errCh <- fmt.Errorf("subscriber %05d failed getting next %d: %s", i, + expect, err) + return + } + if item.Events[0].Index != expect { + errCh <- fmt.Errorf("subscriber %05d got bad event want=%d, got=%d", i, + expect, item.Events[0].Index) + return + } + expect++ + if expect == uint64(nMessages) { + // Succeeded + errCh <- nil + return + } + } + }(i) + } + + // Wait for all readers to finish one way or other + for i := 0; i < nReaders; i++ { + err := <-errCh + assert.NoError(t, err) + } +} diff --git a/agent/consul/stream/event_snapshot.go b/agent/consul/stream/event_snapshot.go new file mode 100644 index 0000000000..bba68397fd --- /dev/null +++ b/agent/consul/stream/event_snapshot.go @@ -0,0 +1,137 @@ +package stream + +import ( + "github.com/hashicorp/consul/agent/agentpb" +) + +// EventSnapshot represents the state of memdb for a given topic and key at some +// point in time. It is modelled as a buffer of events so that snapshots can be +// streamed to possibly multiple subscribers concurrently, and can be trivially +// cached by just keeping the Snapshot around. Once the EventSnapshot is dropped +// from memory, any subscribers still reading from it may do so by following +// their pointers but eventually the snapshot is garbage collected automatically +// by Go's runtime, simplifying snapshot and buffer management dramatically. +type EventSnapshot struct { + // Request that this snapshot satisfies. + Request *agentpb.SubscribeRequest + + // Snap is the first item in the buffer containing the snapshot. Once the + // snapshot is complete, subsequent update's BufferItems are appended such + // that subscribers just need to follow this buffer for the duration of their + // subscription stream. + Snap *BufferItem + + // snapBuffer is the Head of the snapshot buffer the fn should write to. + snapBuffer *EventBuffer + + // topicBufferHead stored the current most-recent published item from before + // the snapshot was taken such that anything published during snapshot + // publishing can be captured. + topicBufferHead *BufferItem + + // SnapFn is the function that will make the snapshot for this request. + fn SnapFn +} + +// SnapFn is the type of function needed to generate a snapshot for a topic and +// key. +type SnapFn func(req *agentpb.SubscribeRequest, buf *EventBuffer) (uint64, error) + +// NewEventSnapshot creates a snapshot buffer based on the subscription request. +// The current buffer head for the topic in question is passed so that once the +// snapshot is complete and has been delivered into the buffer, any events +// published during snapshotting can be immediately appended and won't be +// missed. Once the snapshot is delivered the topic buffer is spliced onto the +// snapshot buffer so that subscribers will naturally follow from the snapshot +// to wait for any subsequent updates. +func NewEventSnapshot(req *agentpb.SubscribeRequest, topicBufferHead *BufferItem, fn SnapFn) *EventSnapshot { + buf := NewEventBuffer() + s := &EventSnapshot{ + Request: req, + Snap: buf.Head(), + snapBuffer: buf, + topicBufferHead: topicBufferHead, + fn: fn, + } + go s.doSnapshot() + return s +} + +func (s *EventSnapshot) doSnapshot() { + // Call snapshot func + idx, err := s.fn(s.Request, s.snapBuffer) + if err != nil { + // Append an error result to signal to subscribers that this snapshot is no + // good. + s.snapBuffer.AppendErr(err) + return + } + + // We wrote the snapshot events to the buffer, send the "end of snapshot" event + s.snapBuffer.Append([]agentpb.Event{agentpb.Event{ + Topic: s.Request.Topic, + Key: s.Request.Key, + Index: idx, + Payload: &agentpb.Event_EndOfSnapshot{ + EndOfSnapshot: true, + }, + }}) + + // Now splice on the topic buffer. We need to iterate through the buffer to + // find the first event after the current snapshot. + item := s.topicBufferHead + for { + // Find the next item that we should include. + next, err := item.NextNoBlock() + if err != nil { + // Append an error result to signal to subscribers that this snapshot is + // no good. + s.snapBuffer.AppendErr(err) + return + } + + if next == nil { + // This is the head of the topic buffer (or was just now which is after + // the snapshot completed). We don't want any of the events (if any) in + // the snapshot buffer as they came before the snapshot but we do need to + // wait for the next update. + follow, err := item.FollowAfter() + if err != nil { + s.snapBuffer.AppendErr(err) + return + } + + s.snapBuffer.AppendBuffer(follow) + // We are done, subscribers will now follow future updates to the topic + // after reading the snapshot events. + return + } + + if next.Err != nil { + s.snapBuffer.AppendErr(next.Err) + return + } + + if len(next.Events) > 0 { + if next.Events[0].Index > idx { + // We've found an update in the topic buffer that happened after our + // snapshot was taken, splice it into the snapshot buffer so subscribers + // can continue to read this and others after it. + s.snapBuffer.AppendBuffer(next) + return + } + } + // We don't need this item, continue to next + item = next + } +} + +// Err returns an error if the snapshot func has failed with an error or nil +// otherwise. Nil doesn't necessarily mean there won't be an error but there +// hasn't been one yet. +func (s *EventSnapshot) Err() error { + // Fetch the head of the buffer, this is atomic. If the snapshot func errored + // then the last event will be an error. + head := s.snapBuffer.Head() + return head.Err +} diff --git a/agent/consul/stream/event_snapshot_test.go b/agent/consul/stream/event_snapshot_test.go new file mode 100644 index 0000000000..685812fae2 --- /dev/null +++ b/agent/consul/stream/event_snapshot_test.go @@ -0,0 +1,191 @@ +package stream + +import ( + "context" + fmt "fmt" + "testing" + time "time" + + "github.com/hashicorp/consul/agent/agentpb" + "github.com/stretchr/testify/require" +) + +func TestEventSnapshot(t *testing.T) { + // Setup a dummy state that we can manipulate easily. The properties we care + // about are that we publish some sequence of events as a snapshot and then + // follow them up with "live updates". We control the interleavings. Our state + // consists of health events (only type fully defined so far) for service + // instances with consecutive ID numbers starting from 0 (e.g. test-000, + // test-001). The snapshot is delivered at index 1000. updatesBeforeSnap + // controls how many updates are delivered _before_ the snapshot is complete + // (with an index < 1000). updatesBeforeSnap controls the number of updates + // delivered after (index > 1000). + // + // In all cases the invariant should be that we end up with all of the + // instances in the snapshot, plus any delivered _after_ the snapshot index, + // but none delivered _before_ the snapshot index otherwise we may have an + // inconsistent snapshot. + cases := []struct { + name string + snapshotSize int + updatesBeforeSnap int + updatesAfterSnap int + }{ + { + name: "snapshot with subsequent mutations", + snapshotSize: 10, + updatesBeforeSnap: 0, + updatesAfterSnap: 10, + }, + { + name: "snapshot with concurrent mutations", + snapshotSize: 10, + updatesBeforeSnap: 5, + updatesAfterSnap: 5, + }, + { + name: "empty snapshot with subsequent mutations", + snapshotSize: 0, + updatesBeforeSnap: 0, + updatesAfterSnap: 10, + }, + { + name: "empty snapshot with concurrent mutations", + snapshotSize: 0, + updatesBeforeSnap: 5, + updatesAfterSnap: 5, + }, + } + + snapIndex := uint64(1000) + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + require.True(t, tc.updatesBeforeSnap < 999, + "bad test param updatesBeforeSnap must be less than the snapshot"+ + " index (%d) minus one (%d), got: %d", snapIndex, snapIndex-1, + tc.updatesBeforeSnap) + + // Create a snapshot func that will deliver registration events. + snFn := testHealthConsecutiveSnapshotFn(tc.snapshotSize, snapIndex) + + // Create a topic buffer for updates + tb := NewEventBuffer() + + // Capture the topic buffer head now so updatesBeforeSnap are "concurrent" + // and are seen by the EventSnapshot once it completes the snap. + tbHead := tb.Head() + + // Deliver any pre-snapshot events simulating updates that occur after the + // topic buffer is captured during a Subscribe call, but before the + // snapshot is made of the FSM. + for i := tc.updatesBeforeSnap; i > 0; i-- { + index := snapIndex - uint64(i) + // Use an instance index that's unique and should never appear in the + // output so we can be sure these were not included as they came before + // the snapshot. + tb.Append([]agentpb.Event{testHealthEvent(index, 10000+i)}) + } + + // Create EventSnapshot, (will call snFn in another goroutine). The + // Request is ignored by the SnapFn so doesn't matter for now. + es := NewEventSnapshot(&agentpb.SubscribeRequest{}, tbHead, snFn) + + // Deliver any post-snapshot events simulating updates that occur + // logically after snapshot. It doesn't matter that these might actually + // be appended before the snapshot fn executes in another goroutine since + // it's operating an a possible stale "snapshot". This is the same as + // reality with the state store where updates that occur after the + // snapshot is taken but while the SnapFnis still running must be captured + // correctly. + for i := 0; i < tc.updatesAfterSnap; i++ { + index := snapIndex + 1 + uint64(i) + // Use an instance index that's unique. + tb.Append([]agentpb.Event{testHealthEvent(index, 20000+i)}) + } + + // Now read the snapshot buffer until we've received everything we expect. + // Don't wait too long in case we get stuck. + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + snapIDs := make([]string, 0, tc.snapshotSize) + updateIDs := make([]string, 0, tc.updatesAfterSnap) + snapDone := false + curItem := es.Snap + var err error + RECV: + for { + curItem, err = curItem.Next(ctx) + // This error is typically timeout so dump the state to aid debugging. + require.NoError(t, err, + "current state: snapDone=%v snapIDs=%s updateIDs=%s", snapDone, + snapIDs, updateIDs) + e := curItem.Events[0] + if snapDone { + sh := e.GetServiceHealth() + require.NotNil(t, sh, "want health event got: %#v", e.Payload) + updateIDs = append(updateIDs, sh.CheckServiceNode.Service.ID) + if len(updateIDs) == tc.updatesAfterSnap { + // We're done! + break RECV + } + } else if e.GetEndOfSnapshot() { + snapDone = true + } else { + sh := e.GetServiceHealth() + require.NotNil(t, sh, "want health event got: %#v", e.Payload) + snapIDs = append(snapIDs, sh.CheckServiceNode.Service.ID) + } + } + + // Validate the event IDs we got delivered. + require.Equal(t, genSequentialIDs(0, tc.snapshotSize), snapIDs) + require.Equal(t, genSequentialIDs(20000, 20000+tc.updatesAfterSnap), updateIDs) + }) + } +} + +func genSequentialIDs(start, end int) []string { + ids := make([]string, 0, end-start) + for i := start; i < end; i++ { + ids = append(ids, fmt.Sprintf("test-%03d", i)) + } + return ids +} + +func testHealthConsecutiveSnapshotFn(size int, index uint64) SnapFn { + return func(req *agentpb.SubscribeRequest, buf *EventBuffer) (uint64, error) { + for i := 0; i < size; i++ { + // Event content is arbitrary we are just using Health because it's the + // first type defined. We just want a set of things with consecutive + // names. + buf.Append([]agentpb.Event{testHealthEvent(index, i)}) + } + return index, nil + } +} + +func testHealthEvent(index uint64, n int) agentpb.Event { + return agentpb.Event{ + Index: index, + Topic: agentpb.Topic_ServiceHealth, + Payload: &agentpb.Event_ServiceHealth{ + ServiceHealth: &agentpb.ServiceHealthUpdate{ + Op: agentpb.CatalogOp_Register, + CheckServiceNode: &agentpb.CheckServiceNode{ + Node: &agentpb.Node{ + Node: "n1", + Address: "10.10.10.10", + }, + Service: &agentpb.NodeService{ + ID: fmt.Sprintf("test-%03d", n), + Service: "test", + Port: 8080, + }, + }, + }, + }, + } +} diff --git a/agent/consul/stream/subscription.go b/agent/consul/stream/subscription.go new file mode 100644 index 0000000000..42c396a130 --- /dev/null +++ b/agent/consul/stream/subscription.go @@ -0,0 +1,128 @@ +package stream + +import ( + context "context" + "errors" + "sync/atomic" + + "github.com/hashicorp/consul/agent/agentpb" +) + +const ( + // SubscriptionStateOpen is the default state of a subscription + SubscriptionStateOpen uint32 = 0 + + // SubscriptionStateCloseReload signals that the subscription was closed by + // server and client should retry. + SubscriptionStateCloseReload uint32 = 1 +) + +var ( + // ErrSubscriptionReload is a error signalling reload event should be sent to + // the client and the server should close. + ErrSubscriptionReload = errors.New("subscription closed by server, client should retry") +) + +// Subscription holds state about a single Subscribe call. Subscribe clients +// access their next event by calling Next(). This may initially include the +// snapshot events to catch them up if they are new or behind. +type Subscription struct { + // state is accessed atomically 0 means open, 1 means closed with reload + state uint32 + + // req is the requests that we are responding to + req *agentpb.SubscribeRequest + + // currentItem stores the current snapshot or topic buffer item we are on. It + // is mutated by calls to Next. + currentItem *BufferItem + + // ctx is the Subscription context that wraps the context of the streaming RPC + // handler call. + ctx context.Context + + // cancelFn stores the context cancel function that will wake up the + // in-progress Next call on a server-initiated state change e.g. Reload. + cancelFn func() +} + +// NewSubscription return a new subscription. +func NewSubscription(ctx context.Context, req *agentpb.SubscribeRequest, item *BufferItem) *Subscription { + subCtx, cancel := context.WithCancel(ctx) + return &Subscription{ + ctx: subCtx, + cancelFn: cancel, + req: req, + currentItem: item, + } +} + +// Next returns the next set of events to deliver. It must only be called from a +// single goroutine concurrently as it mutates the Subscription. +func (s *Subscription) Next() ([]agentpb.Event, error) { + state := atomic.LoadUint32(&s.state) + if state == SubscriptionStateCloseReload { + return nil, ErrSubscriptionReload + } + + for { + next, err := s.currentItem.Next(s.ctx) + if err != nil { + // Check we didn't return because of a state change cancelling the context + state := atomic.LoadUint32(&s.state) + if state == SubscriptionStateCloseReload { + return nil, ErrSubscriptionReload + } + return nil, err + } + // Advance our cursor for next loop or next call + s.currentItem = next + + // Assume happy path where all events (or none) are relevant. + allMatch := true + + // If there is a specific key, see if we need to filter any events + if s.req.Key != "" { + for _, e := range next.Events { + if s.req.Key != e.Key { + allMatch = false + break + } + } + } + + // Only if we need to filter events should we bother allocating a new slice + // as this is a hot loop. + events := next.Events + if !allMatch { + events = make([]agentpb.Event, 0, len(next.Events)) + for _, e := range next.Events { + // Only return it if the key matches. + if s.req.Key == "" || s.req.Key == e.Key { + events = append(events, e) + } + } + } + + if len(events) > 0 { + return events, nil + } + // Keep looping until we find some events we are interested in. + } +} + +// CloseReload closes the stream and signals that the subscriber should reload. +// It is safe to call from any goroutine. +func (s *Subscription) CloseReload() { + swapped := atomic.CompareAndSwapUint32(&s.state, SubscriptionStateOpen, + SubscriptionStateCloseReload) + + if swapped { + s.cancelFn() + } +} + +// Request returns the request object that started the subscription. +func (s *Subscription) Request() *agentpb.SubscribeRequest { + return s.req +} diff --git a/agent/consul/stream/subscription_test.go b/agent/consul/stream/subscription_test.go new file mode 100644 index 0000000000..437d3872ce --- /dev/null +++ b/agent/consul/stream/subscription_test.go @@ -0,0 +1,152 @@ +package stream + +import ( + "context" + "testing" + time "time" + + "github.com/hashicorp/consul/agent/agentpb" + "github.com/stretchr/testify/require" +) + +func TestSubscription(t *testing.T) { + eb := NewEventBuffer() + + index := uint64(100) + + startHead := eb.Head() + + // Start with an event in the buffer + testPublish(index, eb, "test") + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Create a subscription + req := &agentpb.SubscribeRequest{ + Topic: agentpb.Topic_ServiceHealth, + Key: "test", + } + sub := NewSubscription(ctx, req, startHead) + + // First call to sub.Next should return our published event immediately + start := time.Now() + got, err := sub.Next() + elapsed := time.Since(start) + require.NoError(t, err) + require.True(t, elapsed < 200*time.Millisecond, + "Event should have been delivered immediately, took %s", elapsed) + require.Len(t, got, 1) + require.Equal(t, index, got[0].Index) + + // Schedule an event publish in a while + index++ + start = time.Now() + time.AfterFunc(200*time.Millisecond, func() { + testPublish(index, eb, "test") + }) + + // Next call should block until event is delivered + got, err = sub.Next() + elapsed = time.Since(start) + require.NoError(t, err) + require.True(t, elapsed > 200*time.Millisecond, + "Event should have been delivered after blocking 200ms, took %s", elapsed) + require.True(t, elapsed < 2*time.Second, + "Event should have been delivered after short time, took %s", elapsed) + require.Len(t, got, 1) + require.Equal(t, index, got[0].Index) + + // Event with wrong key should not be delivered. Deliver a good message right + // so we don't have to block test thread forever or cancel func yet. + index++ + testPublish(index, eb, "nope") + index++ + testPublish(index, eb, "test") + + start = time.Now() + got, err = sub.Next() + elapsed = time.Since(start) + require.NoError(t, err) + require.True(t, elapsed < 200*time.Millisecond, + "Event should have been delivered immediately, took %s", elapsed) + require.Len(t, got, 1) + require.Equal(t, index, got[0].Index) + require.Equal(t, "test", got[0].Key) + + // Cancelling the subscription context should unblock Next + start = time.Now() + time.AfterFunc(200*time.Millisecond, func() { + cancel() + }) + + _, err = sub.Next() + elapsed = time.Since(start) + require.Error(t, err) + require.True(t, elapsed > 200*time.Millisecond, + "Event should have been delivered after blocking 200ms, took %s", elapsed) + require.True(t, elapsed < 2*time.Second, + "Event should have been delivered after short time, took %s", elapsed) +} + +func TestSubscriptionCloseReload(t *testing.T) { + eb := NewEventBuffer() + + index := uint64(100) + + startHead := eb.Head() + + // Start with an event in the buffer + testPublish(index, eb, "test") + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Create a subscription + req := &agentpb.SubscribeRequest{ + Topic: agentpb.Topic_ServiceHealth, + Key: "test", + } + sub := NewSubscription(ctx, req, startHead) + + // First call to sub.Next should return our published event immediately + start := time.Now() + got, err := sub.Next() + elapsed := time.Since(start) + require.NoError(t, err) + require.True(t, elapsed < 200*time.Millisecond, + "Event should have been delivered immediately, took %s", elapsed) + require.Len(t, got, 1) + require.Equal(t, index, got[0].Index) + + // Schedule a CloseReload simulating the server deciding this subscroption + // needs to reset (e.g. on ACL perm change). + start = time.Now() + time.AfterFunc(200*time.Millisecond, func() { + sub.CloseReload() + }) + + _, err = sub.Next() + elapsed = time.Since(start) + require.Error(t, err) + require.Equal(t, ErrSubscriptionReload, err) + require.True(t, elapsed > 200*time.Millisecond, + "Reload should have happened after blocking 200ms, took %s", elapsed) + require.True(t, elapsed < 2*time.Second, + "Reload should have been delivered after short time, took %s", elapsed) +} + +func testPublish(index uint64, b *EventBuffer, key string) { + // Don't care about the event payload for now just the semantics of publising + // something. This is not a valid stream in the end-to-end streaming protocol + // but enough to test subscription mechanics. + e := agentpb.Event{ + Index: index, + Topic: agentpb.Topic_ServiceHealth, + Key: key, + Payload: &agentpb.Event_EndOfSnapshot{ + EndOfSnapshot: true, + }, + } + b.Append([]agentpb.Event{e}) +}