mirror of https://github.com/status-im/consul.git
Add streaming package with Subscription and Snapshot components.
The remaining files from 7965767de0bd62ab07669b85d6879bd5f815d157 Co-authored-by: Paul Banks <banks@banksco.de>
This commit is contained in:
parent
2a040342ba
commit
c0b0109e80
|
@ -0,0 +1,81 @@
|
||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hashicorp/consul/agent/agentpb"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
memdb "github.com/hashicorp/go-memdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ACLEventsFromChanges returns all the ACL token, policy or role events that
|
||||||
|
// should be emitted given a set of changes to the state store.
|
||||||
|
func (s *Store) ACLEventsFromChanges(tx *txn, changes memdb.Changes) ([]agentpb.Event, error) {
|
||||||
|
|
||||||
|
// Don't allocate yet since in majority of update transactions no ACL token
|
||||||
|
// will be changed.
|
||||||
|
var events []agentpb.Event
|
||||||
|
|
||||||
|
getObj := func(change memdb.Change) interface{} {
|
||||||
|
if change.Deleted() {
|
||||||
|
return change.Before
|
||||||
|
}
|
||||||
|
return change.After
|
||||||
|
}
|
||||||
|
|
||||||
|
getOp := func(change memdb.Change) agentpb.ACLOp {
|
||||||
|
if change.Deleted() {
|
||||||
|
return agentpb.ACLOp_Delete
|
||||||
|
}
|
||||||
|
return agentpb.ACLOp_Update
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, change := range changes {
|
||||||
|
switch change.Table {
|
||||||
|
case "acl-tokens":
|
||||||
|
token := getObj(change).(*structs.ACLToken)
|
||||||
|
e := agentpb.Event{
|
||||||
|
Topic: agentpb.Topic_ACLTokens,
|
||||||
|
Index: tx.Index,
|
||||||
|
Payload: &agentpb.Event_ACLToken{
|
||||||
|
ACLToken: &agentpb.ACLTokenUpdate{
|
||||||
|
Op: getOp(change),
|
||||||
|
Token: &agentpb.ACLTokenIdentifier{
|
||||||
|
AccessorID: token.AccessorID,
|
||||||
|
SecretID: token.SecretID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
events = append(events, e)
|
||||||
|
case "acl-policies":
|
||||||
|
policy := getObj(change).(*structs.ACLPolicy)
|
||||||
|
e := agentpb.Event{
|
||||||
|
Topic: agentpb.Topic_ACLPolicies,
|
||||||
|
Index: tx.Index,
|
||||||
|
Payload: &agentpb.Event_ACLPolicy{
|
||||||
|
ACLPolicy: &agentpb.ACLPolicyUpdate{
|
||||||
|
Op: getOp(change),
|
||||||
|
PolicyID: policy.ID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
events = append(events, e)
|
||||||
|
case "acl-roles":
|
||||||
|
role := getObj(change).(*structs.ACLRole)
|
||||||
|
e := agentpb.Event{
|
||||||
|
Topic: agentpb.Topic_ACLRoles,
|
||||||
|
Index: tx.Index,
|
||||||
|
Payload: &agentpb.Event_ACLRole{
|
||||||
|
ACLRole: &agentpb.ACLRoleUpdate{
|
||||||
|
Op: getOp(change),
|
||||||
|
RoleID: role.ID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
events = append(events, e)
|
||||||
|
default:
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return events, nil
|
||||||
|
}
|
|
@ -0,0 +1,342 @@
|
||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/agentpb"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func testACLTokenEvent(t *testing.T, idx uint64, n int, delete bool) agentpb.Event {
|
||||||
|
t.Helper()
|
||||||
|
uuid := strings.ReplaceAll("11111111-????-????-????-????????????", "?",
|
||||||
|
strconv.Itoa(n))
|
||||||
|
op := agentpb.ACLOp_Update
|
||||||
|
if delete {
|
||||||
|
op = agentpb.ACLOp_Delete
|
||||||
|
}
|
||||||
|
return agentpb.Event{
|
||||||
|
Topic: agentpb.Topic_ACLTokens,
|
||||||
|
Index: idx,
|
||||||
|
Payload: &agentpb.Event_ACLToken{
|
||||||
|
ACLToken: &agentpb.ACLTokenUpdate{
|
||||||
|
Op: op,
|
||||||
|
Token: &agentpb.ACLTokenIdentifier{
|
||||||
|
AccessorID: uuid,
|
||||||
|
SecretID: uuid,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testACLPolicyEvent(t *testing.T, idx uint64, n int, delete bool) agentpb.Event {
|
||||||
|
t.Helper()
|
||||||
|
uuid := strings.ReplaceAll("22222222-????-????-????-????????????", "?",
|
||||||
|
strconv.Itoa(n))
|
||||||
|
op := agentpb.ACLOp_Update
|
||||||
|
if delete {
|
||||||
|
op = agentpb.ACLOp_Delete
|
||||||
|
}
|
||||||
|
return agentpb.Event{
|
||||||
|
Topic: agentpb.Topic_ACLPolicies,
|
||||||
|
Index: idx,
|
||||||
|
Payload: &agentpb.Event_ACLPolicy{
|
||||||
|
ACLPolicy: &agentpb.ACLPolicyUpdate{
|
||||||
|
Op: op,
|
||||||
|
PolicyID: uuid,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testACLRoleEvent(t *testing.T, idx uint64, n int, delete bool) agentpb.Event {
|
||||||
|
t.Helper()
|
||||||
|
uuid := strings.ReplaceAll("33333333-????-????-????-????????????", "?",
|
||||||
|
strconv.Itoa(n))
|
||||||
|
op := agentpb.ACLOp_Update
|
||||||
|
if delete {
|
||||||
|
op = agentpb.ACLOp_Delete
|
||||||
|
}
|
||||||
|
return agentpb.Event{
|
||||||
|
Topic: agentpb.Topic_ACLRoles,
|
||||||
|
Index: idx,
|
||||||
|
Payload: &agentpb.Event_ACLRole{
|
||||||
|
ACLRole: &agentpb.ACLRoleUpdate{
|
||||||
|
Op: op,
|
||||||
|
RoleID: uuid,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testToken(t *testing.T, n int) *structs.ACLToken {
|
||||||
|
uuid := strings.ReplaceAll("11111111-????-????-????-????????????", "?",
|
||||||
|
strconv.Itoa(n))
|
||||||
|
return &structs.ACLToken{
|
||||||
|
AccessorID: uuid,
|
||||||
|
SecretID: uuid,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testPolicy(t *testing.T, n int) *structs.ACLPolicy {
|
||||||
|
numStr := strconv.Itoa(n)
|
||||||
|
uuid := strings.ReplaceAll("22222222-????-????-????-????????????", "?", numStr)
|
||||||
|
return &structs.ACLPolicy{
|
||||||
|
ID: uuid,
|
||||||
|
Name: "test_policy_" + numStr,
|
||||||
|
Rules: `operator = "read"`,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testRole(t *testing.T, n, p int) *structs.ACLRole {
|
||||||
|
numStr := strconv.Itoa(n)
|
||||||
|
uuid := strings.ReplaceAll("33333333-????-????-????-????????????", "?", numStr)
|
||||||
|
policy := testPolicy(t, p)
|
||||||
|
return &structs.ACLRole{
|
||||||
|
ID: uuid,
|
||||||
|
Name: "test_role_" + numStr,
|
||||||
|
Policies: []structs.ACLRolePolicyLink{{
|
||||||
|
ID: policy.ID,
|
||||||
|
Name: policy.Name,
|
||||||
|
}},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestACLEventsFromChanges(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
Name string
|
||||||
|
Setup func(s *Store, tx *txn) error
|
||||||
|
Mutate func(s *Store, tx *txn) error
|
||||||
|
WantEvents []agentpb.Event
|
||||||
|
WantErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
Name: "token create",
|
||||||
|
Mutate: func(s *Store, tx *txn) error {
|
||||||
|
if err := s.aclTokenSetTxn(tx, tx.Index, testToken(t, 1), false, false, false, false); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
WantEvents: []agentpb.Event{
|
||||||
|
testACLTokenEvent(t, 100, 1, false),
|
||||||
|
},
|
||||||
|
WantErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "token update",
|
||||||
|
Setup: func(s *Store, tx *txn) error {
|
||||||
|
if err := s.aclTokenSetTxn(tx, tx.Index, testToken(t, 1), false, false, false, false); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
Mutate: func(s *Store, tx *txn) error {
|
||||||
|
// Add a policy to the token (never mind it doesn't exist for now) we
|
||||||
|
// allow it in the set command below.
|
||||||
|
token := testToken(t, 1)
|
||||||
|
token.Policies = []structs.ACLTokenPolicyLink{{ID: "33333333-1111-1111-1111-111111111111"}}
|
||||||
|
if err := s.aclTokenSetTxn(tx, tx.Index, token, false, true, false, false); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
WantEvents: []agentpb.Event{
|
||||||
|
// Should see an event from the update
|
||||||
|
testACLTokenEvent(t, 100, 1, false),
|
||||||
|
},
|
||||||
|
WantErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "token delete",
|
||||||
|
Setup: func(s *Store, tx *txn) error {
|
||||||
|
if err := s.aclTokenSetTxn(tx, tx.Index, testToken(t, 1), false, false, false, false); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
Mutate: func(s *Store, tx *txn) error {
|
||||||
|
// Delete it
|
||||||
|
token := testToken(t, 1)
|
||||||
|
if err := s.aclTokenDeleteTxn(tx, tx.Index, token.AccessorID, "id", nil); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
WantEvents: []agentpb.Event{
|
||||||
|
// Should see a delete event
|
||||||
|
testACLTokenEvent(t, 100, 1, true),
|
||||||
|
},
|
||||||
|
WantErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "policy create",
|
||||||
|
Mutate: func(s *Store, tx *txn) error {
|
||||||
|
if err := s.aclPolicySetTxn(tx, tx.Index, testPolicy(t, 1)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
WantEvents: []agentpb.Event{
|
||||||
|
testACLPolicyEvent(t, 100, 1, false),
|
||||||
|
},
|
||||||
|
WantErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "policy update",
|
||||||
|
Setup: func(s *Store, tx *txn) error {
|
||||||
|
if err := s.aclPolicySetTxn(tx, tx.Index, testPolicy(t, 1)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
Mutate: func(s *Store, tx *txn) error {
|
||||||
|
policy := testPolicy(t, 1)
|
||||||
|
policy.Rules = `operator = "write"`
|
||||||
|
if err := s.aclPolicySetTxn(tx, tx.Index, policy); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
WantEvents: []agentpb.Event{
|
||||||
|
// Should see an event from the update
|
||||||
|
testACLPolicyEvent(t, 100, 1, false),
|
||||||
|
},
|
||||||
|
WantErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "policy delete",
|
||||||
|
Setup: func(s *Store, tx *txn) error {
|
||||||
|
if err := s.aclPolicySetTxn(tx, tx.Index, testPolicy(t, 1)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
Mutate: func(s *Store, tx *txn) error {
|
||||||
|
// Delete it
|
||||||
|
policy := testPolicy(t, 1)
|
||||||
|
if err := s.aclPolicyDeleteTxn(tx, tx.Index, policy.ID, s.aclPolicyGetByID, nil); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
WantEvents: []agentpb.Event{
|
||||||
|
// Should see a delete event
|
||||||
|
testACLPolicyEvent(t, 100, 1, true),
|
||||||
|
},
|
||||||
|
WantErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "role create",
|
||||||
|
Mutate: func(s *Store, tx *txn) error {
|
||||||
|
if err := s.aclRoleSetTxn(tx, tx.Index, testRole(t, 1, 1), true); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
WantEvents: []agentpb.Event{
|
||||||
|
testACLRoleEvent(t, 100, 1, false),
|
||||||
|
},
|
||||||
|
WantErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "role update",
|
||||||
|
Setup: func(s *Store, tx *txn) error {
|
||||||
|
if err := s.aclRoleSetTxn(tx, tx.Index, testRole(t, 1, 1), true); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
Mutate: func(s *Store, tx *txn) error {
|
||||||
|
role := testRole(t, 1, 1)
|
||||||
|
policy2 := testPolicy(t, 2)
|
||||||
|
role.Policies = append(role.Policies, structs.ACLRolePolicyLink{
|
||||||
|
ID: policy2.ID,
|
||||||
|
Name: policy2.Name,
|
||||||
|
})
|
||||||
|
if err := s.aclRoleSetTxn(tx, tx.Index, role, true); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
WantEvents: []agentpb.Event{
|
||||||
|
// Should see an event from the update
|
||||||
|
testACLRoleEvent(t, 100, 1, false),
|
||||||
|
},
|
||||||
|
WantErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "role delete",
|
||||||
|
Setup: func(s *Store, tx *txn) error {
|
||||||
|
if err := s.aclRoleSetTxn(tx, tx.Index, testRole(t, 1, 1), true); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
Mutate: func(s *Store, tx *txn) error {
|
||||||
|
// Delete it
|
||||||
|
role := testRole(t, 1, 1)
|
||||||
|
if err := s.aclRoleDeleteTxn(tx, tx.Index, role.ID, s.aclRoleGetByID, nil); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
WantEvents: []agentpb.Event{
|
||||||
|
// Should see a delete event
|
||||||
|
testACLRoleEvent(t, 100, 1, true),
|
||||||
|
},
|
||||||
|
WantErr: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range cases {
|
||||||
|
tc := tc
|
||||||
|
t.Run(tc.Name, func(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
if tc.Setup != nil {
|
||||||
|
// Bypass the publish mechanism for this test or we get into odd
|
||||||
|
// recursive stuff...
|
||||||
|
setupTx := s.db.WriteTxn(10)
|
||||||
|
require.NoError(t, tc.Setup(s, setupTx))
|
||||||
|
// Commit the underlying transaction without using wrapped Commit so we
|
||||||
|
// avoid the whole event publishing system for setup here. It _should_
|
||||||
|
// work but it makes debugging test hard as it will call the function
|
||||||
|
// under test for the setup data...
|
||||||
|
setupTx.Txn.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
tx := s.db.WriteTxn(100)
|
||||||
|
require.NoError(t, tc.Mutate(s, tx))
|
||||||
|
|
||||||
|
// Note we call the func under test directly rather than publishChanges so
|
||||||
|
// we can test this in isolation.
|
||||||
|
got, err := s.ACLEventsFromChanges(tx, tx.Changes())
|
||||||
|
if tc.WantErr {
|
||||||
|
require.Error(t, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Make sure we have the right events, only taking ordering into account
|
||||||
|
// where it matters to account for non-determinism.
|
||||||
|
requireEventsInCorrectPartialOrder(t, tc.WantEvents, got, func(e agentpb.Event) string {
|
||||||
|
// We only care that events affecting the same actual token are ordered
|
||||||
|
// with respect ot each other so use it's ID as the key.
|
||||||
|
switch v := e.Payload.(type) {
|
||||||
|
case *agentpb.Event_ACLToken:
|
||||||
|
return "token:" + v.ACLToken.Token.AccessorID
|
||||||
|
case *agentpb.Event_ACLPolicy:
|
||||||
|
return "policy:" + v.ACLPolicy.PolicyID
|
||||||
|
case *agentpb.Event_ACLRole:
|
||||||
|
return "role:" + v.ACLRole.RoleID
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,388 @@
|
||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-memdb"
|
||||||
|
"golang.org/x/crypto/blake2b"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/agentpb"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/stream"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
)
|
||||||
|
|
||||||
|
type EventPublisher struct {
|
||||||
|
store *Store
|
||||||
|
|
||||||
|
// topicBufferSize controls how many trailing events we keep in memory for
|
||||||
|
// each topic to avoid needing to snapshot again for re-connecting clients
|
||||||
|
// that may have missed some events. It may be zero for no buffering (the most
|
||||||
|
// recent event is always kept though). TODO
|
||||||
|
topicBufferSize int
|
||||||
|
|
||||||
|
// snapCacheTTL controls how long we keep snapshots in our cache before
|
||||||
|
// allowing them to be garbage collected and a new one made for subsequent
|
||||||
|
// requests for that topic and key. In general this should be pretty short to
|
||||||
|
// keep memory overhead of duplicated event data low - snapshots are typically
|
||||||
|
// not that expensive, but having a cache for a few seconds can help
|
||||||
|
// de-duplicate building the same snapshot over and over again when a
|
||||||
|
// thundering herd of watchers all subscribe to the same topic within a few
|
||||||
|
// seconds. TODO
|
||||||
|
snapCacheTTL time.Duration
|
||||||
|
|
||||||
|
// This lock protects the topicBuffers, snapCache and subsByToken maps.
|
||||||
|
lock sync.RWMutex
|
||||||
|
|
||||||
|
// topicBuffers stores the head of the linked-list buffer to publish events to
|
||||||
|
// for a topic.
|
||||||
|
topicBuffers map[agentpb.Topic]*stream.EventBuffer
|
||||||
|
|
||||||
|
// snapCache stores the head of any snapshot buffers still in cache if caching
|
||||||
|
// is enabled.
|
||||||
|
snapCache map[agentpb.Topic]map[string]*stream.EventSnapshot
|
||||||
|
|
||||||
|
// snapFns is the set of snapshot functions that were registered bound to the
|
||||||
|
// state store.
|
||||||
|
snapFns map[agentpb.Topic]stream.SnapFn
|
||||||
|
|
||||||
|
// subsByToken stores a list of Subscription objects outstanding indexed by a
|
||||||
|
// hash of the ACL token they used to subscribe so we can reload them if their
|
||||||
|
// ACL permissions change.
|
||||||
|
subsByToken map[string]map[*agentpb.SubscribeRequest]*stream.Subscription
|
||||||
|
|
||||||
|
// commitCh decouples the Commit call in the FSM hot path from distributing
|
||||||
|
// the resulting events.
|
||||||
|
commitCh chan commitUpdate
|
||||||
|
}
|
||||||
|
|
||||||
|
type commitUpdate struct {
|
||||||
|
tx *txnWrapper
|
||||||
|
events []agentpb.Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewEventPublisher(store *Store, topicBufferSize int, snapCacheTTL time.Duration) *EventPublisher {
|
||||||
|
e := &EventPublisher{
|
||||||
|
store: store,
|
||||||
|
topicBufferSize: topicBufferSize,
|
||||||
|
snapCacheTTL: snapCacheTTL,
|
||||||
|
topicBuffers: make(map[agentpb.Topic]*stream.EventBuffer),
|
||||||
|
snapCache: make(map[agentpb.Topic]map[string]*stream.EventSnapshot),
|
||||||
|
snapFns: make(map[agentpb.Topic]stream.SnapFn),
|
||||||
|
subsByToken: make(map[string]map[*agentpb.SubscribeRequest]*stream.Subscription),
|
||||||
|
commitCh: make(chan commitUpdate, 64),
|
||||||
|
}
|
||||||
|
|
||||||
|
// create a local handler table
|
||||||
|
// TODO: document why
|
||||||
|
for topic, handlers := range topicRegistry {
|
||||||
|
fnCopy := handlers.Snapshot
|
||||||
|
e.snapFns[topic] = func(req *agentpb.SubscribeRequest, buf *stream.EventBuffer) (uint64, error) {
|
||||||
|
return fnCopy(e.store, req, buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
go e.handleUpdates()
|
||||||
|
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EventPublisher) publishChanges(tx *txn, changes memdb.Changes) error {
|
||||||
|
var events []agentpb.Event
|
||||||
|
for topic, th := range topicRegistry {
|
||||||
|
if th.ProcessChanges != nil {
|
||||||
|
es, err := th.ProcessChanges(e.store, tx, changes)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed generating events for topic %q: %s", topic, err)
|
||||||
|
}
|
||||||
|
events = append(events, es...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
e.commitCh <- commitUpdate{
|
||||||
|
// TODO: document why it must be created here, and not in the new thread
|
||||||
|
//
|
||||||
|
// Create a new transaction since it's going to be used from a different
|
||||||
|
// thread. Transactions aren't thread safe but it's OK to create it here
|
||||||
|
// since we won't try to use it in this thread and pass it straight to the
|
||||||
|
// handler which will own it exclusively.
|
||||||
|
tx: e.store.db.Txn(false),
|
||||||
|
events: events,
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EventPublisher) handleUpdates() {
|
||||||
|
for {
|
||||||
|
update := <-e.commitCh
|
||||||
|
e.sendEvents(update)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendEvents sends the given events to any applicable topic listeners, as well
|
||||||
|
// as any ACL update events to cause affected listeners to reset their stream.
|
||||||
|
func (e *EventPublisher) sendEvents(update commitUpdate) {
|
||||||
|
e.lock.Lock()
|
||||||
|
defer e.lock.Unlock()
|
||||||
|
|
||||||
|
// Always abort the transaction. This is not strictly necessary with memDB
|
||||||
|
// because once we drop the reference to the Txn object, the radix nodes will
|
||||||
|
// be GCed anyway but it's hygienic incase memDB ever has a different
|
||||||
|
// implementation.
|
||||||
|
defer update.tx.Abort()
|
||||||
|
|
||||||
|
eventsByTopic := make(map[agentpb.Topic][]agentpb.Event)
|
||||||
|
|
||||||
|
for _, event := range update.events {
|
||||||
|
// If the event is an ACL update, treat it as a special case. Currently
|
||||||
|
// ACL update events are only used internally to recognize when a subscriber
|
||||||
|
// should reload its subscription.
|
||||||
|
if event.Topic == agentpb.Topic_ACLTokens ||
|
||||||
|
event.Topic == agentpb.Topic_ACLPolicies ||
|
||||||
|
event.Topic == agentpb.Topic_ACLRoles {
|
||||||
|
|
||||||
|
if err := e.handleACLUpdate(update.tx, event); err != nil {
|
||||||
|
// This seems pretty drastic? What would be better. It's not super safe
|
||||||
|
// to continue since we might have missed some ACL update and so leak
|
||||||
|
// data to unauthorized clients but crashing whole server also seems
|
||||||
|
// bad. I wonder if we could send a "reset" to all subscribers instead
|
||||||
|
// and effectively re-start all subscriptions to be on the safe side
|
||||||
|
// without just crashing?
|
||||||
|
// TODO(banks): reset all instead of panic?
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Split events by topic to deliver.
|
||||||
|
eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deliver events
|
||||||
|
for topic, events := range eventsByTopic {
|
||||||
|
buf, ok := e.topicBuffers[topic]
|
||||||
|
if !ok {
|
||||||
|
buf = stream.NewEventBuffer()
|
||||||
|
e.topicBuffers[topic] = buf
|
||||||
|
}
|
||||||
|
buf.Append(events)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleACLUpdate handles an ACL token/policy/role update. This method assumes
|
||||||
|
// the lock is held.
|
||||||
|
func (e *EventPublisher) handleACLUpdate(tx *txn, event agentpb.Event) error {
|
||||||
|
switch event.Topic {
|
||||||
|
case agentpb.Topic_ACLTokens:
|
||||||
|
token := event.GetACLToken()
|
||||||
|
subs := e.subsByToken[secretHash(token.Token.SecretID)]
|
||||||
|
for _, sub := range subs {
|
||||||
|
sub.CloseReload()
|
||||||
|
}
|
||||||
|
case agentpb.Topic_ACLPolicies:
|
||||||
|
policy := event.GetACLPolicy()
|
||||||
|
// TODO(streaming) figure out how to thread method/ent meta here for
|
||||||
|
// namespace support in Ent. Probably need wildcard here?
|
||||||
|
tokens, err := e.store.aclTokenListByPolicy(tx, policy.PolicyID, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Loop through the tokens used by the policy.
|
||||||
|
for token := tokens.Next(); token != nil; token = tokens.Next() {
|
||||||
|
token := token.(*structs.ACLToken)
|
||||||
|
if subs, ok := e.subsByToken[secretHash(token.SecretID)]; ok {
|
||||||
|
for _, sub := range subs {
|
||||||
|
sub.CloseReload()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find any roles using this policy so tokens with those roles can be reloaded.
|
||||||
|
roles, err := e.store.aclRoleListByPolicy(tx, policy.PolicyID, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for role := roles.Next(); role != nil; role = roles.Next() {
|
||||||
|
role := role.(*structs.ACLRole)
|
||||||
|
|
||||||
|
// TODO(streaming) figure out how to thread method/ent meta here for
|
||||||
|
// namespace support in Ent.
|
||||||
|
tokens, err := e.store.aclTokenListByRole(tx, role.ID, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for token := tokens.Next(); token != nil; token = tokens.Next() {
|
||||||
|
token := token.(*structs.ACLToken)
|
||||||
|
if subs, ok := e.subsByToken[secretHash(token.SecretID)]; ok {
|
||||||
|
for _, sub := range subs {
|
||||||
|
sub.CloseReload()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
case agentpb.Topic_ACLRoles:
|
||||||
|
role := event.GetACLRole()
|
||||||
|
// TODO(streaming) figure out how to thread method/ent meta here for
|
||||||
|
// namespace support in Ent.
|
||||||
|
tokens, err := e.store.aclTokenListByRole(tx, role.RoleID, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for token := tokens.Next(); token != nil; token = tokens.Next() {
|
||||||
|
token := token.(*structs.ACLToken)
|
||||||
|
if subs, ok := e.subsByToken[secretHash(token.SecretID)]; ok {
|
||||||
|
for _, sub := range subs {
|
||||||
|
sub.CloseReload()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// secretHash returns a 256-bit Blake2 hash of the given string.
|
||||||
|
func secretHash(token string) string {
|
||||||
|
hash, err := blake2b.New256(nil)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
hash.Write([]byte(token))
|
||||||
|
return string(hash.Sum(nil))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe returns a new stream.Subscription for the given request. A
|
||||||
|
// subscription will stream an initial snapshot of events matching the request
|
||||||
|
// if required and then block until new events that modify the request occur, or
|
||||||
|
// the context is cancelled. Subscriptions may be forced to reset if the server
|
||||||
|
// decides it can no longer maintain correct operation for example if ACL
|
||||||
|
// policies changed or the state store was restored.
|
||||||
|
//
|
||||||
|
// When the called is finished with the subscription for any reason, it must
|
||||||
|
// call Unsubscribe to free ACL tracking resources.
|
||||||
|
func (e *EventPublisher) Subscribe(ctx context.Context,
|
||||||
|
req *agentpb.SubscribeRequest) (*stream.Subscription, error) {
|
||||||
|
// Ensure we know how to make a snapshot for this topic
|
||||||
|
_, ok := topicRegistry[req.Topic]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("unknown topic %s", req.Topic)
|
||||||
|
}
|
||||||
|
|
||||||
|
e.lock.Lock()
|
||||||
|
defer e.lock.Unlock()
|
||||||
|
|
||||||
|
// Ensure there is a topic buffer for that topic so we start capturing any
|
||||||
|
// future published events.
|
||||||
|
buf, ok := e.topicBuffers[req.Topic]
|
||||||
|
if !ok {
|
||||||
|
buf = stream.NewEventBuffer()
|
||||||
|
e.topicBuffers[req.Topic] = buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// See if we need a snapshot
|
||||||
|
topicHead := buf.Head()
|
||||||
|
var sub *stream.Subscription
|
||||||
|
if req.Index > 0 && len(topicHead.Events) > 0 && topicHead.Events[0].Index == req.Index {
|
||||||
|
// No need for a snapshot just send the "end snapshot" message to signal to
|
||||||
|
// client it's cache is still good. (note that this can be distinguished
|
||||||
|
// from a legitimate empty snapshot due to the index matching the one the
|
||||||
|
// client sent), then follow along from here in the topic.
|
||||||
|
e := agentpb.Event{
|
||||||
|
Index: req.Index,
|
||||||
|
Topic: req.Topic,
|
||||||
|
Key: req.Key,
|
||||||
|
Payload: &agentpb.Event_ResumeStream{ResumeStream: true},
|
||||||
|
}
|
||||||
|
// Make a new buffer to send to the client containing the resume.
|
||||||
|
buf := stream.NewEventBuffer()
|
||||||
|
|
||||||
|
// Store the head of that buffer before we append to it to give as the
|
||||||
|
// starting point for the subscription.
|
||||||
|
subHead := buf.Head()
|
||||||
|
|
||||||
|
buf.Append([]agentpb.Event{e})
|
||||||
|
|
||||||
|
// Now splice the rest of the topic buffer on so the subscription will
|
||||||
|
// continue to see future updates in the topic buffer.
|
||||||
|
follow, err := topicHead.FollowAfter()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
buf.AppendBuffer(follow)
|
||||||
|
|
||||||
|
sub = stream.NewSubscription(ctx, req, subHead)
|
||||||
|
} else {
|
||||||
|
snap, err := e.getSnapshotLocked(req, topicHead)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
sub = stream.NewSubscription(ctx, req, snap.Snap)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the subscription to the ACL token map.
|
||||||
|
tokenHash := secretHash(req.Token)
|
||||||
|
subsByToken, ok := e.subsByToken[tokenHash]
|
||||||
|
if !ok {
|
||||||
|
subsByToken = make(map[*agentpb.SubscribeRequest]*stream.Subscription)
|
||||||
|
e.subsByToken[tokenHash] = subsByToken
|
||||||
|
}
|
||||||
|
subsByToken[req] = sub
|
||||||
|
|
||||||
|
return sub, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unsubscribe must be called when a client is no longer interested in a
|
||||||
|
// subscription to free resources monitoring changes in it's ACL token. The same
|
||||||
|
// request object passed to Subscribe must be used.
|
||||||
|
func (e *EventPublisher) Unsubscribe(req *agentpb.SubscribeRequest) {
|
||||||
|
e.lock.Lock()
|
||||||
|
defer e.lock.Unlock()
|
||||||
|
|
||||||
|
tokenHash := secretHash(req.Token)
|
||||||
|
subsByToken, ok := e.subsByToken[tokenHash]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
delete(subsByToken, req)
|
||||||
|
if len(subsByToken) == 0 {
|
||||||
|
delete(e.subsByToken, tokenHash)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EventPublisher) getSnapshotLocked(req *agentpb.SubscribeRequest, topicHead *stream.BufferItem) (*stream.EventSnapshot, error) {
|
||||||
|
// See if there is a cached snapshot
|
||||||
|
topicSnaps, ok := e.snapCache[req.Topic]
|
||||||
|
if !ok {
|
||||||
|
topicSnaps = make(map[string]*stream.EventSnapshot)
|
||||||
|
e.snapCache[req.Topic] = topicSnaps
|
||||||
|
}
|
||||||
|
|
||||||
|
snap, ok := topicSnaps[req.Key]
|
||||||
|
if ok && snap.Err() == nil {
|
||||||
|
return snap, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// No snap or errored snap in cache, create a new one
|
||||||
|
snapFn, ok := e.snapFns[req.Topic]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("unknown topic %s", req.Topic)
|
||||||
|
}
|
||||||
|
|
||||||
|
snap = stream.NewEventSnapshot(req, topicHead, snapFn)
|
||||||
|
if e.snapCacheTTL > 0 {
|
||||||
|
topicSnaps[req.Key] = snap
|
||||||
|
|
||||||
|
// Trigger a clearout after TTL
|
||||||
|
time.AfterFunc(e.snapCacheTTL, func() {
|
||||||
|
e.lock.Lock()
|
||||||
|
defer e.lock.Unlock()
|
||||||
|
delete(topicSnaps, req.Key)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return snap, nil
|
||||||
|
}
|
|
@ -0,0 +1,454 @@
|
||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/agentpb"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/stream"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type nextResult struct {
|
||||||
|
Events []agentpb.Event
|
||||||
|
Err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func testRunSub(sub *stream.Subscription) <-chan nextResult {
|
||||||
|
eventCh := make(chan nextResult, 1)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
es, err := sub.Next()
|
||||||
|
eventCh <- nextResult{
|
||||||
|
Events: es,
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return eventCh
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertNoEvent(t *testing.T, eventCh <-chan nextResult) {
|
||||||
|
t.Helper()
|
||||||
|
select {
|
||||||
|
case next := <-eventCh:
|
||||||
|
require.NoError(t, next.Err)
|
||||||
|
require.Len(t, next.Events, 1)
|
||||||
|
t.Fatalf("got unwanted event: %#v", next.Events[0].GetPayload())
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertEvent(t *testing.T, eventCh <-chan nextResult) *agentpb.Event {
|
||||||
|
t.Helper()
|
||||||
|
select {
|
||||||
|
case next := <-eventCh:
|
||||||
|
require.NoError(t, next.Err)
|
||||||
|
require.Len(t, next.Events, 1)
|
||||||
|
return &next.Events[0]
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatalf("no event after 100ms")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertErr(t *testing.T, eventCh <-chan nextResult) error {
|
||||||
|
t.Helper()
|
||||||
|
select {
|
||||||
|
case next := <-eventCh:
|
||||||
|
require.Error(t, next.Err)
|
||||||
|
return next.Err
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatalf("no err after 100ms")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// assertReset checks that a ResetStream event is send to the subscription
|
||||||
|
// within 100ms. If allowEOS is true it will ignore any intermediate events that
|
||||||
|
// come before the reset provided they are EndOfSnapshot events because in many
|
||||||
|
// cases it's non-deterministic whether the snapshot will complete before the
|
||||||
|
// acl reset is handled.
|
||||||
|
func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
|
||||||
|
t.Helper()
|
||||||
|
timeoutCh := time.After(100 * time.Millisecond)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case next := <-eventCh:
|
||||||
|
if allowEOS {
|
||||||
|
if next.Err == nil && len(next.Events) == 1 && next.Events[0].GetEndOfSnapshot() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.Error(t, next.Err)
|
||||||
|
require.Equal(t, stream.ErrSubscriptionReload, next.Err)
|
||||||
|
return
|
||||||
|
case <-timeoutCh:
|
||||||
|
t.Fatalf("no err after 100ms")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken {
|
||||||
|
// Token to use during this test.
|
||||||
|
token := &structs.ACLToken{
|
||||||
|
AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4",
|
||||||
|
SecretID: "4268ce0d-d7ae-4718-8613-42eba9036020",
|
||||||
|
Description: "something",
|
||||||
|
Policies: []structs.ACLTokenPolicyLink{
|
||||||
|
structs.ACLTokenPolicyLink{
|
||||||
|
ID: testPolicyID_A,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Roles: []structs.ACLTokenRoleLink{
|
||||||
|
structs.ACLTokenRoleLink{
|
||||||
|
ID: testRoleID_B,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
token.SetHash(false)
|
||||||
|
|
||||||
|
// If we subscribe immediately after we create a token we race with the
|
||||||
|
// publisher that is publishing the ACL token event for the token we just
|
||||||
|
// created. That means that the subscription we create right after will often
|
||||||
|
// be immediately reset. The most reliable way to avoid that without just
|
||||||
|
// sleeping for some arbitrary time is to pre-subscribe using the token before
|
||||||
|
// it actually exists (which works because the publisher doesn't check tokens
|
||||||
|
// it assumes something lower down did that) and then wait for it to be reset
|
||||||
|
// so we know the initial token write event has been sent out before
|
||||||
|
// continuing...
|
||||||
|
subscription := &agentpb.SubscribeRequest{
|
||||||
|
Topic: agentpb.Topic_ServiceHealth,
|
||||||
|
Key: "nope",
|
||||||
|
Token: token.SecretID,
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
sub, err := s.publisher.Subscribe(ctx, subscription)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
eventCh := testRunSub(sub)
|
||||||
|
|
||||||
|
// Create the ACL token to be used in the subscription.
|
||||||
|
require.NoError(t, s.ACLTokenSet(2, token.Clone(), false))
|
||||||
|
|
||||||
|
// Wait for the pre-subscription to be reset
|
||||||
|
assertReset(t, eventCh, true)
|
||||||
|
|
||||||
|
return token
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPublisher_BasicPublish(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
require := require.New(t)
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Register an initial instance
|
||||||
|
reg := structs.TestRegisterRequest(t)
|
||||||
|
reg.Service.ID = "web1"
|
||||||
|
require.NoError(s.EnsureRegistration(1, reg))
|
||||||
|
|
||||||
|
// Register the subscription.
|
||||||
|
subscription := &agentpb.SubscribeRequest{
|
||||||
|
Topic: agentpb.Topic_ServiceHealth,
|
||||||
|
Key: reg.Service.Service,
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
sub, err := s.publisher.Subscribe(ctx, subscription)
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
eventCh := testRunSub(sub)
|
||||||
|
|
||||||
|
// Stream should get the instance and then EndOfSnapshot
|
||||||
|
e := assertEvent(t, eventCh)
|
||||||
|
sh := e.GetServiceHealth()
|
||||||
|
require.NotNil(sh, "expected service health event, got %v", e)
|
||||||
|
e = assertEvent(t, eventCh)
|
||||||
|
require.True(e.GetEndOfSnapshot())
|
||||||
|
|
||||||
|
// Now subscriber should block waiting for updates
|
||||||
|
assertNoEvent(t, eventCh)
|
||||||
|
|
||||||
|
// Add a new instance of service on a different node
|
||||||
|
reg2 := reg
|
||||||
|
reg2.Node = "node2"
|
||||||
|
require.NoError(s.EnsureRegistration(1, reg))
|
||||||
|
|
||||||
|
// Subscriber should see registration
|
||||||
|
e = assertEvent(t, eventCh)
|
||||||
|
sh = e.GetServiceHealth()
|
||||||
|
require.NotNil(sh, "expected service health event, got %v", e)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPublisher_ACLTokenUpdate(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
require := require.New(t)
|
||||||
|
s := testACLTokensStateStore(t)
|
||||||
|
|
||||||
|
// Setup token and wait for good state
|
||||||
|
token := createTokenAndWaitForACLEventPublish(t, s)
|
||||||
|
|
||||||
|
// Register the subscription.
|
||||||
|
subscription := &agentpb.SubscribeRequest{
|
||||||
|
Topic: agentpb.Topic_ServiceHealth,
|
||||||
|
Key: "nope",
|
||||||
|
Token: token.SecretID,
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
sub, err := s.publisher.Subscribe(ctx, subscription)
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
eventCh := testRunSub(sub)
|
||||||
|
|
||||||
|
// Stream should get EndOfSnapshot
|
||||||
|
e := assertEvent(t, eventCh)
|
||||||
|
require.True(e.GetEndOfSnapshot())
|
||||||
|
|
||||||
|
// Update an unrelated token.
|
||||||
|
token2 := &structs.ACLToken{
|
||||||
|
AccessorID: "a7bbf480-8440-4f55-acfc-6fdca25cb13e",
|
||||||
|
SecretID: "72e81982-7a0f-491f-a60e-c9c802ac1402",
|
||||||
|
}
|
||||||
|
token2.SetHash(false)
|
||||||
|
require.NoError(s.ACLTokenSet(3, token2.Clone(), false))
|
||||||
|
|
||||||
|
// Ensure there's no reset event.
|
||||||
|
assertNoEvent(t, eventCh)
|
||||||
|
|
||||||
|
// Now update the token used in the subscriber.
|
||||||
|
token3 := &structs.ACLToken{
|
||||||
|
AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4",
|
||||||
|
SecretID: "4268ce0d-d7ae-4718-8613-42eba9036020",
|
||||||
|
Description: "something else",
|
||||||
|
}
|
||||||
|
token3.SetHash(false)
|
||||||
|
require.NoError(s.ACLTokenSet(4, token3.Clone(), false))
|
||||||
|
|
||||||
|
// Ensure the reset event was sent.
|
||||||
|
err = assertErr(t, eventCh)
|
||||||
|
require.Equal(stream.ErrSubscriptionReload, err)
|
||||||
|
|
||||||
|
// Register another subscription.
|
||||||
|
subscription2 := &agentpb.SubscribeRequest{
|
||||||
|
Topic: agentpb.Topic_ServiceHealth,
|
||||||
|
Key: "nope",
|
||||||
|
Token: token.SecretID,
|
||||||
|
}
|
||||||
|
sub2, err := s.publisher.Subscribe(ctx, subscription2)
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
eventCh2 := testRunSub(sub2)
|
||||||
|
|
||||||
|
// Expect initial EoS
|
||||||
|
e = assertEvent(t, eventCh2)
|
||||||
|
require.True(e.GetEndOfSnapshot())
|
||||||
|
|
||||||
|
// Delete the unrelated token.
|
||||||
|
require.NoError(s.ACLTokenDeleteByAccessor(5, token2.AccessorID, nil))
|
||||||
|
|
||||||
|
// Ensure there's no reset event.
|
||||||
|
assertNoEvent(t, eventCh2)
|
||||||
|
|
||||||
|
// Delete the token used by the subscriber.
|
||||||
|
require.NoError(s.ACLTokenDeleteByAccessor(6, token.AccessorID, nil))
|
||||||
|
|
||||||
|
// Ensure the reset event was sent.
|
||||||
|
err = assertErr(t, eventCh2)
|
||||||
|
require.Equal(stream.ErrSubscriptionReload, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPublisher_ACLPolicyUpdate(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
require := require.New(t)
|
||||||
|
s := testACLTokensStateStore(t)
|
||||||
|
|
||||||
|
// Create token and wait for good state
|
||||||
|
token := createTokenAndWaitForACLEventPublish(t, s)
|
||||||
|
|
||||||
|
// Register the subscription.
|
||||||
|
subscription := &agentpb.SubscribeRequest{
|
||||||
|
Topic: agentpb.Topic_ServiceHealth,
|
||||||
|
Key: "nope",
|
||||||
|
Token: token.SecretID,
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
sub, err := s.publisher.Subscribe(ctx, subscription)
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
eventCh := testRunSub(sub)
|
||||||
|
|
||||||
|
// Ignore the end of snapshot event
|
||||||
|
e := assertEvent(t, eventCh)
|
||||||
|
require.True(e.GetEndOfSnapshot(), "event should be a EoS got %v", e)
|
||||||
|
|
||||||
|
// Update an unrelated policy.
|
||||||
|
policy2 := structs.ACLPolicy{
|
||||||
|
ID: testPolicyID_C,
|
||||||
|
Name: "foo-read",
|
||||||
|
Rules: `node "foo" { policy = "read" }`,
|
||||||
|
Syntax: acl.SyntaxCurrent,
|
||||||
|
Datacenters: []string{"dc1"},
|
||||||
|
}
|
||||||
|
policy2.SetHash(false)
|
||||||
|
require.NoError(s.ACLPolicySet(3, &policy2))
|
||||||
|
|
||||||
|
// Ensure there's no reset event.
|
||||||
|
assertNoEvent(t, eventCh)
|
||||||
|
|
||||||
|
// Now update the policy used in the subscriber.
|
||||||
|
policy3 := structs.ACLPolicy{
|
||||||
|
ID: testPolicyID_A,
|
||||||
|
Name: "node-read",
|
||||||
|
Rules: `node_prefix "" { policy = "write" }`,
|
||||||
|
Syntax: acl.SyntaxCurrent,
|
||||||
|
Datacenters: []string{"dc1"},
|
||||||
|
}
|
||||||
|
policy3.SetHash(false)
|
||||||
|
require.NoError(s.ACLPolicySet(4, &policy3))
|
||||||
|
|
||||||
|
// Ensure the reset event was sent.
|
||||||
|
assertReset(t, eventCh, true)
|
||||||
|
|
||||||
|
// Register another subscription.
|
||||||
|
subscription2 := &agentpb.SubscribeRequest{
|
||||||
|
Topic: agentpb.Topic_ServiceHealth,
|
||||||
|
Key: "nope",
|
||||||
|
Token: token.SecretID,
|
||||||
|
}
|
||||||
|
sub, err = s.publisher.Subscribe(ctx, subscription2)
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
eventCh = testRunSub(sub)
|
||||||
|
|
||||||
|
// Ignore the end of snapshot event
|
||||||
|
e = assertEvent(t, eventCh)
|
||||||
|
require.True(e.GetEndOfSnapshot(), "event should be a EoS got %v", e)
|
||||||
|
|
||||||
|
// Delete the unrelated policy.
|
||||||
|
require.NoError(s.ACLPolicyDeleteByID(5, testPolicyID_C, nil))
|
||||||
|
|
||||||
|
// Ensure there's no reload event.
|
||||||
|
assertNoEvent(t, eventCh)
|
||||||
|
|
||||||
|
// Delete the policy used by the subscriber.
|
||||||
|
require.NoError(s.ACLPolicyDeleteByID(6, testPolicyID_A, nil))
|
||||||
|
|
||||||
|
// Ensure the reload event was sent.
|
||||||
|
err = assertErr(t, eventCh)
|
||||||
|
require.Equal(stream.ErrSubscriptionReload, err)
|
||||||
|
|
||||||
|
// Register another subscription.
|
||||||
|
subscription3 := &agentpb.SubscribeRequest{
|
||||||
|
Topic: agentpb.Topic_ServiceHealth,
|
||||||
|
Key: "nope",
|
||||||
|
Token: token.SecretID,
|
||||||
|
}
|
||||||
|
sub, err = s.publisher.Subscribe(ctx, subscription3)
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
eventCh = testRunSub(sub)
|
||||||
|
|
||||||
|
// Ignore the end of snapshot event
|
||||||
|
e = assertEvent(t, eventCh)
|
||||||
|
require.True(e.GetEndOfSnapshot(), "event should be a EoS got %v", e)
|
||||||
|
|
||||||
|
// Now update the policy used in role B, but not directly in the token.
|
||||||
|
policy4 := structs.ACLPolicy{
|
||||||
|
ID: testPolicyID_B,
|
||||||
|
Name: "node-read",
|
||||||
|
Rules: `node_prefix "foo" { policy = "read" }`,
|
||||||
|
Syntax: acl.SyntaxCurrent,
|
||||||
|
Datacenters: []string{"dc1"},
|
||||||
|
}
|
||||||
|
policy4.SetHash(false)
|
||||||
|
require.NoError(s.ACLPolicySet(7, &policy4))
|
||||||
|
|
||||||
|
// Ensure the reset event was sent.
|
||||||
|
assertReset(t, eventCh, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPublisher_ACLRoleUpdate(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
require := require.New(t)
|
||||||
|
s := testACLTokensStateStore(t)
|
||||||
|
|
||||||
|
// Create token and wait for good state
|
||||||
|
token := createTokenAndWaitForACLEventPublish(t, s)
|
||||||
|
|
||||||
|
// Register the subscription.
|
||||||
|
subscription := &agentpb.SubscribeRequest{
|
||||||
|
Topic: agentpb.Topic_ServiceHealth,
|
||||||
|
Key: "nope",
|
||||||
|
Token: token.SecretID,
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
sub, err := s.publisher.Subscribe(ctx, subscription)
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
eventCh := testRunSub(sub)
|
||||||
|
|
||||||
|
// Stream should get EndOfSnapshot
|
||||||
|
e := assertEvent(t, eventCh)
|
||||||
|
require.True(e.GetEndOfSnapshot())
|
||||||
|
|
||||||
|
// Update an unrelated role (the token has role testRoleID_B).
|
||||||
|
role := structs.ACLRole{
|
||||||
|
ID: testRoleID_A,
|
||||||
|
Name: "unrelated-role",
|
||||||
|
Description: "test",
|
||||||
|
}
|
||||||
|
role.SetHash(false)
|
||||||
|
require.NoError(s.ACLRoleSet(3, &role))
|
||||||
|
|
||||||
|
// Ensure there's no reload event.
|
||||||
|
assertNoEvent(t, eventCh)
|
||||||
|
|
||||||
|
// Now update the role used by the token in the subscriber.
|
||||||
|
role2 := structs.ACLRole{
|
||||||
|
ID: testRoleID_B,
|
||||||
|
Name: "my-new-role",
|
||||||
|
Description: "changed",
|
||||||
|
}
|
||||||
|
role2.SetHash(false)
|
||||||
|
require.NoError(s.ACLRoleSet(4, &role2))
|
||||||
|
|
||||||
|
// Ensure the reload event was sent.
|
||||||
|
assertReset(t, eventCh, false)
|
||||||
|
|
||||||
|
// Register another subscription.
|
||||||
|
subscription2 := &agentpb.SubscribeRequest{
|
||||||
|
Topic: agentpb.Topic_ServiceHealth,
|
||||||
|
Key: "nope",
|
||||||
|
Token: token.SecretID,
|
||||||
|
}
|
||||||
|
sub, err = s.publisher.Subscribe(ctx, subscription2)
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
eventCh = testRunSub(sub)
|
||||||
|
|
||||||
|
// Ignore the end of snapshot event
|
||||||
|
e = assertEvent(t, eventCh)
|
||||||
|
require.True(e.GetEndOfSnapshot(), "event should be a EoS got %v", e)
|
||||||
|
|
||||||
|
// Delete the unrelated policy.
|
||||||
|
require.NoError(s.ACLRoleDeleteByID(5, testRoleID_A, nil))
|
||||||
|
|
||||||
|
// Ensure there's no reload event.
|
||||||
|
assertNoEvent(t, eventCh)
|
||||||
|
|
||||||
|
// Delete the policy used by the subscriber.
|
||||||
|
require.NoError(s.ACLRoleDeleteByID(6, testRoleID_B, nil))
|
||||||
|
|
||||||
|
// Ensure the reload event was sent.
|
||||||
|
assertReset(t, eventCh, false)
|
||||||
|
}
|
|
@ -0,0 +1,49 @@
|
||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hashicorp/consul/agent/agentpb"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/stream"
|
||||||
|
memdb "github.com/hashicorp/go-memdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
// unboundSnapFn is a stream.SnapFn with state store as the first argument. This
|
||||||
|
// is bound to a concrete state store instance in the EventPublisher on startup.
|
||||||
|
type unboundSnapFn func(*Store, *agentpb.SubscribeRequest, *stream.EventBuffer) (uint64, error)
|
||||||
|
type unboundProcessChangesFn func(*Store, *txnWrapper, memdb.Changes) ([]agentpb.Event, error)
|
||||||
|
|
||||||
|
// topicHandlers describes the methods needed to process a streaming
|
||||||
|
// subscription for a given topic.
|
||||||
|
type topicHandlers struct {
|
||||||
|
Snapshot unboundSnapFn
|
||||||
|
ProcessChanges unboundProcessChangesFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// topicRegistry is a map of topic handlers. It must only be written to during
|
||||||
|
// init().
|
||||||
|
var topicRegistry map[agentpb.Topic]topicHandlers
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
topicRegistry = map[agentpb.Topic]topicHandlers{
|
||||||
|
agentpb.Topic_ServiceHealth: topicHandlers{
|
||||||
|
Snapshot: (*Store).ServiceHealthSnapshot,
|
||||||
|
ProcessChanges: (*Store).ServiceHealthEventsFromChanges,
|
||||||
|
},
|
||||||
|
agentpb.Topic_ServiceHealthConnect: topicHandlers{
|
||||||
|
Snapshot: (*Store).ServiceHealthConnectSnapshot,
|
||||||
|
// Note there is no ProcessChanges since Connect events are published by
|
||||||
|
// the same event publisher as regular health events to avoid duplicating
|
||||||
|
// lots of filtering on every commit.
|
||||||
|
},
|
||||||
|
// For now we don't actually support subscribing to ACL* topics externally
|
||||||
|
// so these have no Snapshot methods yet. We do need to have a
|
||||||
|
// ProcessChanges func to publish the partial events on ACL changes though
|
||||||
|
// so that we can invalidate other subscriptions if their effective ACL
|
||||||
|
// permissions change.
|
||||||
|
agentpb.Topic_ACLTokens: topicHandlers{
|
||||||
|
ProcessChanges: (*Store).ACLEventsFromChanges,
|
||||||
|
},
|
||||||
|
// Note no ACLPolicies/ACLRoles defined yet because we publish all events
|
||||||
|
// from one handler to save on iterating/filtering and duplicating code and
|
||||||
|
// there are no snapshots for these yet per comment above.
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,248 @@
|
||||||
|
package stream
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/agentpb"
|
||||||
|
)
|
||||||
|
|
||||||
|
// EventBuffer is a single-writer, multiple-reader, unlimited length concurrent
|
||||||
|
// buffer of events that have been published on a topic. The buffer is
|
||||||
|
// effectively just the head of an atomically updated single-linked list. Atomic
|
||||||
|
// accesses are usually to be suspected as premature optimization but this
|
||||||
|
// specifc design has several important features that significantly simplify a
|
||||||
|
// lot of our PubSub machinery.
|
||||||
|
//
|
||||||
|
// The Buffer itself only ever tracks the most recent set of events published so
|
||||||
|
// if there are no consumers older events are automatically garbage collected.
|
||||||
|
// Notification of new events is done by closing a channel on the previous head
|
||||||
|
// alowing efficient broadcast to many watchers without having to run multile
|
||||||
|
// goroutines or deliver to O(N) separate channels.
|
||||||
|
//
|
||||||
|
// Because it's a linked list with atomically updated pointers, readers don't
|
||||||
|
// have to take a lock and can consume at their own pace. but we also don't have
|
||||||
|
// to have a fixed limit on the number of items which either means we don't have
|
||||||
|
// to trade off buffer length config to balance using lots of memory wastefully
|
||||||
|
// vs handling occasional slow readers.
|
||||||
|
//
|
||||||
|
// The buffer is used to deliver all messages broadcast toa topic for active
|
||||||
|
// subscribers to consume, but it is also an effective way to both deliver and
|
||||||
|
// optionally cache snapshots per topic and key. byt using an EventBuffer,
|
||||||
|
// snapshot functions don't have to read the whole snapshot into memory before
|
||||||
|
// delivery - they can stream from memdb. However simply by storing a pointer to
|
||||||
|
// the first event in the buffer, we can cache the buffered events for future
|
||||||
|
// watchers on the same topic. Finally, once we've delivered all the snapshot
|
||||||
|
// events to the buffer, we can append a next-element which is the first topic
|
||||||
|
// buffer element with a higher index and so consuers can just keep reading the
|
||||||
|
// same buffer.
|
||||||
|
//
|
||||||
|
// A huge benefit here is that caching snapshots becomes very simple - we don't
|
||||||
|
// have to do any additional book keeping to figure out when to truncate the
|
||||||
|
// topic buffer to make sure the snapshot is still usable or run into issues
|
||||||
|
// where the cached snapshot is no longer useful since the buffer will keep
|
||||||
|
// elements around only as long as either the cache or a subscriber need them.
|
||||||
|
// So we can use whatever simple timeout logic we like to decide how long to
|
||||||
|
// keep caches (or if we should keep them at all) and the buffers will
|
||||||
|
// automatically keep the events we need to make that work for exactly the
|
||||||
|
// optimal amount of time and no longer.
|
||||||
|
//
|
||||||
|
// A new buffer is constructed with a sentinel "empty" BufferItem that has a nil
|
||||||
|
// Events array. This enables subscribers to start watching for the next update
|
||||||
|
// immediately.
|
||||||
|
//
|
||||||
|
// The zero value EventBuffer is _not_ a usable type since it has not been
|
||||||
|
// initialized with an empty bufferItem so can't be used to wait for the first
|
||||||
|
// published event. Call NewEventBuffer to construct a new buffer.
|
||||||
|
//
|
||||||
|
// Calls to Append or AppendBuffer that mutate the head must be externally
|
||||||
|
// synchronized. This allows systems that already serialize writes to append
|
||||||
|
// without lock overhead (e.g. a snapshot goroutine appending thousands of
|
||||||
|
// events).
|
||||||
|
type EventBuffer struct {
|
||||||
|
head atomic.Value
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewEventBuffer creates an EventBuffer ready for use.
|
||||||
|
func NewEventBuffer() *EventBuffer {
|
||||||
|
b := &EventBuffer{}
|
||||||
|
b.head.Store(NewBufferItem())
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
|
// Append a set of events from one raft operation to the buffer and notify
|
||||||
|
// watchers. Note that events must not have been previously made available to
|
||||||
|
// any other goroutine since we may mutate them to ensure ACL Rules are
|
||||||
|
// populated. After calling append, the caller must not make any further
|
||||||
|
// mutations to the events as they may have been exposed to subscribers in other
|
||||||
|
// goroutines. Append only supports a single concurrent caller and must be
|
||||||
|
// externally synchronized with other Append, AppendBuffer or AppendErr calls.
|
||||||
|
func (b *EventBuffer) Append(events []agentpb.Event) {
|
||||||
|
// Push events to the head
|
||||||
|
it := NewBufferItem()
|
||||||
|
it.Events = events
|
||||||
|
b.AppendBuffer(it)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendBuffer joins another buffer which may be the tail of a separate buffer
|
||||||
|
// for example a buffer that's had the events from a snapshot appended may
|
||||||
|
// finally by linked to the topic buffer for the subsequent events so
|
||||||
|
// subscribers can seamlessly consume the updates. Note that Events in item must
|
||||||
|
// already be fully populated with ACL rules and must not be mutated further as
|
||||||
|
// they may have already been published to subscribers.
|
||||||
|
//
|
||||||
|
// AppendBuffer only supports a single concurrent caller and must be externally
|
||||||
|
// synchronized with other Append, AppendBuffer or AppendErr calls.
|
||||||
|
func (b *EventBuffer) AppendBuffer(item *BufferItem) {
|
||||||
|
// First store it as the next node for the old head this ensures once it's
|
||||||
|
// visible to new searchers the linked list is already valid. Not sure it
|
||||||
|
// matters but this seems nicer.
|
||||||
|
oldHead := b.Head()
|
||||||
|
oldHead.link.next.Store(item)
|
||||||
|
b.head.Store(item)
|
||||||
|
|
||||||
|
// Now it's added invalidate the oldHead to notify waiters
|
||||||
|
close(oldHead.link.ch)
|
||||||
|
// don't set chan to nil since that will race with readers accessing it.
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendErr publishes an error result to the end of the buffer. This is
|
||||||
|
// considered terminal and will cause all subscribers to end their current
|
||||||
|
// streaming subscription and return the error. AppendErr only supports a
|
||||||
|
// single concurrent caller and must be externally synchronized with other
|
||||||
|
// Append, AppendBuffer or AppendErr calls.
|
||||||
|
func (b *EventBuffer) AppendErr(err error) {
|
||||||
|
b.AppendBuffer(&BufferItem{Err: err})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Head returns the current head of the buffer. It will always exist but it may
|
||||||
|
// be a "sentinel" empty item with a nil Events slice to allow consumers to
|
||||||
|
// watch for the next update. Consumers should always check for empty Events and
|
||||||
|
// treat them as no-ops. Will panic if EventBuffer was not initialized correctly
|
||||||
|
// with EventBuffer.
|
||||||
|
func (b *EventBuffer) Head() *BufferItem {
|
||||||
|
return b.head.Load().(*BufferItem)
|
||||||
|
}
|
||||||
|
|
||||||
|
// BufferItem represents a set of events published by a single raft operation.
|
||||||
|
// The first item returned by a newly constructed buffer will have nil Events
|
||||||
|
// and should be considered a "sentinel" value just useful for waiting on the
|
||||||
|
// next events via Next.
|
||||||
|
//
|
||||||
|
// To iterate to the next event, a Next method may be called which may block if
|
||||||
|
// there is no next element yet.
|
||||||
|
//
|
||||||
|
// Holding a pointer to the item keeps all the events published since in memory
|
||||||
|
// so it's important that subscribers don't hold pointers to buffer items after
|
||||||
|
// they have been delivered except where it's intentional to maintain a cache or
|
||||||
|
// trailing store of events for performance reasons.
|
||||||
|
//
|
||||||
|
// Subscribers must not mutate the BufferItem or the Events or Encoded payloads
|
||||||
|
// inside as these are shared between all readers.
|
||||||
|
type BufferItem struct {
|
||||||
|
// Events is the set of events published at one raft index. This may be nil as
|
||||||
|
// a sentinel value to allow watching for the first event in a buffer. Callers
|
||||||
|
// should check and skip nil Events at any point in the buffer. It will also
|
||||||
|
// be nil if the producer appends an Error event because they can't complete
|
||||||
|
// the request to populate the buffer. Err will be non-nil in this case.
|
||||||
|
Events []agentpb.Event
|
||||||
|
|
||||||
|
// Err is non-nil if the producer can't complete their task and terminates the
|
||||||
|
// buffer. Subscribers should return the error to clients and cease attempting
|
||||||
|
// to read from the buffer.
|
||||||
|
Err error
|
||||||
|
|
||||||
|
// link holds the next pointer and channel. This extra bit of indirection
|
||||||
|
// allows us to splice buffers together at arbitrary points without including
|
||||||
|
// events in one buffer just for the side-effect of watching for the next set.
|
||||||
|
// The link may not be mutated once the event is appended to a buffer.
|
||||||
|
link *bufferLink
|
||||||
|
}
|
||||||
|
|
||||||
|
type bufferLink struct {
|
||||||
|
// next is an atomically updated pointer to the next event in the buffer. It
|
||||||
|
// is written exactly once by the single published and will always be set if
|
||||||
|
// ch is closed.
|
||||||
|
next atomic.Value
|
||||||
|
|
||||||
|
// ch is closed when the next event is published. It should never be mutated
|
||||||
|
// (e.g. set to nil) as that is racey, but is closed once when the next event
|
||||||
|
// is published. the next pointer will have been set by the time this is
|
||||||
|
// closed.
|
||||||
|
ch chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBufferItem returns a blank buffer item with a link and chan ready to have
|
||||||
|
// the fields set and be appended to a buffer.
|
||||||
|
func NewBufferItem() *BufferItem {
|
||||||
|
return &BufferItem{
|
||||||
|
link: &bufferLink{
|
||||||
|
ch: make(chan struct{}),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next return the next buffer item in the buffer. It may block until ctx is
|
||||||
|
// cancelled or until the next item is published.
|
||||||
|
func (i *BufferItem) Next(ctx context.Context) (*BufferItem, error) {
|
||||||
|
// See if there is already a next value, block if so. Note we don't rely on
|
||||||
|
// state change (chan nil) as that's not threadsafe but detecting close is.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
case <-i.link.ch:
|
||||||
|
}
|
||||||
|
|
||||||
|
// If channel closed, there must be a next item to read
|
||||||
|
nextRaw := i.link.next.Load()
|
||||||
|
if nextRaw == nil {
|
||||||
|
// shouldn't be possible
|
||||||
|
return nil, errors.New("invalid next item")
|
||||||
|
}
|
||||||
|
next := nextRaw.(*BufferItem)
|
||||||
|
if next.Err != nil {
|
||||||
|
return nil, next.Err
|
||||||
|
}
|
||||||
|
if len(next.Events) == 0 {
|
||||||
|
// Skip this event
|
||||||
|
return next.Next(ctx)
|
||||||
|
}
|
||||||
|
return next, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NextNoBlock returns the next item in the buffer without blocking. If it
|
||||||
|
// reaches the most recent item it will return nil and no error.
|
||||||
|
func (i *BufferItem) NextNoBlock() (*BufferItem, error) {
|
||||||
|
nextRaw := i.link.next.Load()
|
||||||
|
if nextRaw == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
next := nextRaw.(*BufferItem)
|
||||||
|
if next.Err != nil {
|
||||||
|
return nil, next.Err
|
||||||
|
}
|
||||||
|
if len(next.Events) == 0 {
|
||||||
|
// Skip this event
|
||||||
|
return next.NextNoBlock()
|
||||||
|
}
|
||||||
|
return next, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FollowAfter returns either the next item in the buffer if there is already
|
||||||
|
// one, or if not it returns an empty item (that will be ignored by subscribers)
|
||||||
|
// that has the same link as the current buffer so that it will be notified of
|
||||||
|
// future updates in the buffer without including the current item.
|
||||||
|
func (i *BufferItem) FollowAfter() (*BufferItem, error) {
|
||||||
|
next, err := i.NextNoBlock()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if next == nil {
|
||||||
|
// Return an empty item that can be followed to the next item published.
|
||||||
|
item := &BufferItem{}
|
||||||
|
item.link = i.link
|
||||||
|
return item, nil
|
||||||
|
}
|
||||||
|
return next, nil
|
||||||
|
}
|
|
@ -0,0 +1,89 @@
|
||||||
|
package stream
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
fmt "fmt"
|
||||||
|
"math/rand"
|
||||||
|
"testing"
|
||||||
|
time "time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/agentpb"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestEventBufferFuzz(t *testing.T) {
|
||||||
|
// A property-based test to ensure that under heavy concurrent use trivial
|
||||||
|
// correctness properties are not violated (and that -race doesn't complain).
|
||||||
|
|
||||||
|
nReaders := 1000
|
||||||
|
nMessages := 1000
|
||||||
|
|
||||||
|
b := NewEventBuffer()
|
||||||
|
|
||||||
|
// Start a write goroutine that will publish 10000 messages with sequential
|
||||||
|
// indexes and some jitter in timing (to allow clients to "catch up" and block
|
||||||
|
// waiting for updates).
|
||||||
|
go func() {
|
||||||
|
// z is a Zipfian distribution that gives us a number of milliseconds to
|
||||||
|
// sleep which are mostly low - near zero but occasionally spike up to near
|
||||||
|
// 100.
|
||||||
|
z := rand.NewZipf(rand.New(rand.NewSource(1)), 1.5, 1.5, 50)
|
||||||
|
|
||||||
|
for i := 0; i < nMessages; i++ {
|
||||||
|
// Event content is arbitrary and not valid for our use of buffers in
|
||||||
|
// streaming - here we only care about the semantics of the buffer.
|
||||||
|
e := agentpb.Event{
|
||||||
|
Index: uint64(i), // Indexes should be contiguous
|
||||||
|
Topic: agentpb.Topic_ServiceHealth,
|
||||||
|
Payload: &agentpb.Event_EndOfSnapshot{
|
||||||
|
EndOfSnapshot: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
b.Append([]agentpb.Event{e})
|
||||||
|
// Sleep sometimes for a while to let some subscribers catch up
|
||||||
|
wait := time.Duration(z.Uint64()) * time.Millisecond
|
||||||
|
time.Sleep(wait)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Run n subscribers following and verifying
|
||||||
|
errCh := make(chan error, nReaders)
|
||||||
|
|
||||||
|
// Load head here so all subscribers start from the same point or they might
|
||||||
|
// no run until several appends have already happened.
|
||||||
|
head := b.Head()
|
||||||
|
|
||||||
|
for i := 0; i < nReaders; i++ {
|
||||||
|
go func(i int) {
|
||||||
|
expect := uint64(0)
|
||||||
|
item := head
|
||||||
|
var err error
|
||||||
|
for {
|
||||||
|
item, err = item.Next(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
errCh <- fmt.Errorf("subscriber %05d failed getting next %d: %s", i,
|
||||||
|
expect, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if item.Events[0].Index != expect {
|
||||||
|
errCh <- fmt.Errorf("subscriber %05d got bad event want=%d, got=%d", i,
|
||||||
|
expect, item.Events[0].Index)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
expect++
|
||||||
|
if expect == uint64(nMessages) {
|
||||||
|
// Succeeded
|
||||||
|
errCh <- nil
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all readers to finish one way or other
|
||||||
|
for i := 0; i < nReaders; i++ {
|
||||||
|
err := <-errCh
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,137 @@
|
||||||
|
package stream
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hashicorp/consul/agent/agentpb"
|
||||||
|
)
|
||||||
|
|
||||||
|
// EventSnapshot represents the state of memdb for a given topic and key at some
|
||||||
|
// point in time. It is modelled as a buffer of events so that snapshots can be
|
||||||
|
// streamed to possibly multiple subscribers concurrently, and can be trivially
|
||||||
|
// cached by just keeping the Snapshot around. Once the EventSnapshot is dropped
|
||||||
|
// from memory, any subscribers still reading from it may do so by following
|
||||||
|
// their pointers but eventually the snapshot is garbage collected automatically
|
||||||
|
// by Go's runtime, simplifying snapshot and buffer management dramatically.
|
||||||
|
type EventSnapshot struct {
|
||||||
|
// Request that this snapshot satisfies.
|
||||||
|
Request *agentpb.SubscribeRequest
|
||||||
|
|
||||||
|
// Snap is the first item in the buffer containing the snapshot. Once the
|
||||||
|
// snapshot is complete, subsequent update's BufferItems are appended such
|
||||||
|
// that subscribers just need to follow this buffer for the duration of their
|
||||||
|
// subscription stream.
|
||||||
|
Snap *BufferItem
|
||||||
|
|
||||||
|
// snapBuffer is the Head of the snapshot buffer the fn should write to.
|
||||||
|
snapBuffer *EventBuffer
|
||||||
|
|
||||||
|
// topicBufferHead stored the current most-recent published item from before
|
||||||
|
// the snapshot was taken such that anything published during snapshot
|
||||||
|
// publishing can be captured.
|
||||||
|
topicBufferHead *BufferItem
|
||||||
|
|
||||||
|
// SnapFn is the function that will make the snapshot for this request.
|
||||||
|
fn SnapFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// SnapFn is the type of function needed to generate a snapshot for a topic and
|
||||||
|
// key.
|
||||||
|
type SnapFn func(req *agentpb.SubscribeRequest, buf *EventBuffer) (uint64, error)
|
||||||
|
|
||||||
|
// NewEventSnapshot creates a snapshot buffer based on the subscription request.
|
||||||
|
// The current buffer head for the topic in question is passed so that once the
|
||||||
|
// snapshot is complete and has been delivered into the buffer, any events
|
||||||
|
// published during snapshotting can be immediately appended and won't be
|
||||||
|
// missed. Once the snapshot is delivered the topic buffer is spliced onto the
|
||||||
|
// snapshot buffer so that subscribers will naturally follow from the snapshot
|
||||||
|
// to wait for any subsequent updates.
|
||||||
|
func NewEventSnapshot(req *agentpb.SubscribeRequest, topicBufferHead *BufferItem, fn SnapFn) *EventSnapshot {
|
||||||
|
buf := NewEventBuffer()
|
||||||
|
s := &EventSnapshot{
|
||||||
|
Request: req,
|
||||||
|
Snap: buf.Head(),
|
||||||
|
snapBuffer: buf,
|
||||||
|
topicBufferHead: topicBufferHead,
|
||||||
|
fn: fn,
|
||||||
|
}
|
||||||
|
go s.doSnapshot()
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *EventSnapshot) doSnapshot() {
|
||||||
|
// Call snapshot func
|
||||||
|
idx, err := s.fn(s.Request, s.snapBuffer)
|
||||||
|
if err != nil {
|
||||||
|
// Append an error result to signal to subscribers that this snapshot is no
|
||||||
|
// good.
|
||||||
|
s.snapBuffer.AppendErr(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// We wrote the snapshot events to the buffer, send the "end of snapshot" event
|
||||||
|
s.snapBuffer.Append([]agentpb.Event{agentpb.Event{
|
||||||
|
Topic: s.Request.Topic,
|
||||||
|
Key: s.Request.Key,
|
||||||
|
Index: idx,
|
||||||
|
Payload: &agentpb.Event_EndOfSnapshot{
|
||||||
|
EndOfSnapshot: true,
|
||||||
|
},
|
||||||
|
}})
|
||||||
|
|
||||||
|
// Now splice on the topic buffer. We need to iterate through the buffer to
|
||||||
|
// find the first event after the current snapshot.
|
||||||
|
item := s.topicBufferHead
|
||||||
|
for {
|
||||||
|
// Find the next item that we should include.
|
||||||
|
next, err := item.NextNoBlock()
|
||||||
|
if err != nil {
|
||||||
|
// Append an error result to signal to subscribers that this snapshot is
|
||||||
|
// no good.
|
||||||
|
s.snapBuffer.AppendErr(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if next == nil {
|
||||||
|
// This is the head of the topic buffer (or was just now which is after
|
||||||
|
// the snapshot completed). We don't want any of the events (if any) in
|
||||||
|
// the snapshot buffer as they came before the snapshot but we do need to
|
||||||
|
// wait for the next update.
|
||||||
|
follow, err := item.FollowAfter()
|
||||||
|
if err != nil {
|
||||||
|
s.snapBuffer.AppendErr(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.snapBuffer.AppendBuffer(follow)
|
||||||
|
// We are done, subscribers will now follow future updates to the topic
|
||||||
|
// after reading the snapshot events.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if next.Err != nil {
|
||||||
|
s.snapBuffer.AppendErr(next.Err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(next.Events) > 0 {
|
||||||
|
if next.Events[0].Index > idx {
|
||||||
|
// We've found an update in the topic buffer that happened after our
|
||||||
|
// snapshot was taken, splice it into the snapshot buffer so subscribers
|
||||||
|
// can continue to read this and others after it.
|
||||||
|
s.snapBuffer.AppendBuffer(next)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// We don't need this item, continue to next
|
||||||
|
item = next
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Err returns an error if the snapshot func has failed with an error or nil
|
||||||
|
// otherwise. Nil doesn't necessarily mean there won't be an error but there
|
||||||
|
// hasn't been one yet.
|
||||||
|
func (s *EventSnapshot) Err() error {
|
||||||
|
// Fetch the head of the buffer, this is atomic. If the snapshot func errored
|
||||||
|
// then the last event will be an error.
|
||||||
|
head := s.snapBuffer.Head()
|
||||||
|
return head.Err
|
||||||
|
}
|
|
@ -0,0 +1,191 @@
|
||||||
|
package stream
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
fmt "fmt"
|
||||||
|
"testing"
|
||||||
|
time "time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/agentpb"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestEventSnapshot(t *testing.T) {
|
||||||
|
// Setup a dummy state that we can manipulate easily. The properties we care
|
||||||
|
// about are that we publish some sequence of events as a snapshot and then
|
||||||
|
// follow them up with "live updates". We control the interleavings. Our state
|
||||||
|
// consists of health events (only type fully defined so far) for service
|
||||||
|
// instances with consecutive ID numbers starting from 0 (e.g. test-000,
|
||||||
|
// test-001). The snapshot is delivered at index 1000. updatesBeforeSnap
|
||||||
|
// controls how many updates are delivered _before_ the snapshot is complete
|
||||||
|
// (with an index < 1000). updatesBeforeSnap controls the number of updates
|
||||||
|
// delivered after (index > 1000).
|
||||||
|
//
|
||||||
|
// In all cases the invariant should be that we end up with all of the
|
||||||
|
// instances in the snapshot, plus any delivered _after_ the snapshot index,
|
||||||
|
// but none delivered _before_ the snapshot index otherwise we may have an
|
||||||
|
// inconsistent snapshot.
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
snapshotSize int
|
||||||
|
updatesBeforeSnap int
|
||||||
|
updatesAfterSnap int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "snapshot with subsequent mutations",
|
||||||
|
snapshotSize: 10,
|
||||||
|
updatesBeforeSnap: 0,
|
||||||
|
updatesAfterSnap: 10,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "snapshot with concurrent mutations",
|
||||||
|
snapshotSize: 10,
|
||||||
|
updatesBeforeSnap: 5,
|
||||||
|
updatesAfterSnap: 5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "empty snapshot with subsequent mutations",
|
||||||
|
snapshotSize: 0,
|
||||||
|
updatesBeforeSnap: 0,
|
||||||
|
updatesAfterSnap: 10,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "empty snapshot with concurrent mutations",
|
||||||
|
snapshotSize: 0,
|
||||||
|
updatesBeforeSnap: 5,
|
||||||
|
updatesAfterSnap: 5,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
snapIndex := uint64(1000)
|
||||||
|
|
||||||
|
for _, tc := range cases {
|
||||||
|
tc := tc
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
require.True(t, tc.updatesBeforeSnap < 999,
|
||||||
|
"bad test param updatesBeforeSnap must be less than the snapshot"+
|
||||||
|
" index (%d) minus one (%d), got: %d", snapIndex, snapIndex-1,
|
||||||
|
tc.updatesBeforeSnap)
|
||||||
|
|
||||||
|
// Create a snapshot func that will deliver registration events.
|
||||||
|
snFn := testHealthConsecutiveSnapshotFn(tc.snapshotSize, snapIndex)
|
||||||
|
|
||||||
|
// Create a topic buffer for updates
|
||||||
|
tb := NewEventBuffer()
|
||||||
|
|
||||||
|
// Capture the topic buffer head now so updatesBeforeSnap are "concurrent"
|
||||||
|
// and are seen by the EventSnapshot once it completes the snap.
|
||||||
|
tbHead := tb.Head()
|
||||||
|
|
||||||
|
// Deliver any pre-snapshot events simulating updates that occur after the
|
||||||
|
// topic buffer is captured during a Subscribe call, but before the
|
||||||
|
// snapshot is made of the FSM.
|
||||||
|
for i := tc.updatesBeforeSnap; i > 0; i-- {
|
||||||
|
index := snapIndex - uint64(i)
|
||||||
|
// Use an instance index that's unique and should never appear in the
|
||||||
|
// output so we can be sure these were not included as they came before
|
||||||
|
// the snapshot.
|
||||||
|
tb.Append([]agentpb.Event{testHealthEvent(index, 10000+i)})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create EventSnapshot, (will call snFn in another goroutine). The
|
||||||
|
// Request is ignored by the SnapFn so doesn't matter for now.
|
||||||
|
es := NewEventSnapshot(&agentpb.SubscribeRequest{}, tbHead, snFn)
|
||||||
|
|
||||||
|
// Deliver any post-snapshot events simulating updates that occur
|
||||||
|
// logically after snapshot. It doesn't matter that these might actually
|
||||||
|
// be appended before the snapshot fn executes in another goroutine since
|
||||||
|
// it's operating an a possible stale "snapshot". This is the same as
|
||||||
|
// reality with the state store where updates that occur after the
|
||||||
|
// snapshot is taken but while the SnapFnis still running must be captured
|
||||||
|
// correctly.
|
||||||
|
for i := 0; i < tc.updatesAfterSnap; i++ {
|
||||||
|
index := snapIndex + 1 + uint64(i)
|
||||||
|
// Use an instance index that's unique.
|
||||||
|
tb.Append([]agentpb.Event{testHealthEvent(index, 20000+i)})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now read the snapshot buffer until we've received everything we expect.
|
||||||
|
// Don't wait too long in case we get stuck.
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
snapIDs := make([]string, 0, tc.snapshotSize)
|
||||||
|
updateIDs := make([]string, 0, tc.updatesAfterSnap)
|
||||||
|
snapDone := false
|
||||||
|
curItem := es.Snap
|
||||||
|
var err error
|
||||||
|
RECV:
|
||||||
|
for {
|
||||||
|
curItem, err = curItem.Next(ctx)
|
||||||
|
// This error is typically timeout so dump the state to aid debugging.
|
||||||
|
require.NoError(t, err,
|
||||||
|
"current state: snapDone=%v snapIDs=%s updateIDs=%s", snapDone,
|
||||||
|
snapIDs, updateIDs)
|
||||||
|
e := curItem.Events[0]
|
||||||
|
if snapDone {
|
||||||
|
sh := e.GetServiceHealth()
|
||||||
|
require.NotNil(t, sh, "want health event got: %#v", e.Payload)
|
||||||
|
updateIDs = append(updateIDs, sh.CheckServiceNode.Service.ID)
|
||||||
|
if len(updateIDs) == tc.updatesAfterSnap {
|
||||||
|
// We're done!
|
||||||
|
break RECV
|
||||||
|
}
|
||||||
|
} else if e.GetEndOfSnapshot() {
|
||||||
|
snapDone = true
|
||||||
|
} else {
|
||||||
|
sh := e.GetServiceHealth()
|
||||||
|
require.NotNil(t, sh, "want health event got: %#v", e.Payload)
|
||||||
|
snapIDs = append(snapIDs, sh.CheckServiceNode.Service.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate the event IDs we got delivered.
|
||||||
|
require.Equal(t, genSequentialIDs(0, tc.snapshotSize), snapIDs)
|
||||||
|
require.Equal(t, genSequentialIDs(20000, 20000+tc.updatesAfterSnap), updateIDs)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func genSequentialIDs(start, end int) []string {
|
||||||
|
ids := make([]string, 0, end-start)
|
||||||
|
for i := start; i < end; i++ {
|
||||||
|
ids = append(ids, fmt.Sprintf("test-%03d", i))
|
||||||
|
}
|
||||||
|
return ids
|
||||||
|
}
|
||||||
|
|
||||||
|
func testHealthConsecutiveSnapshotFn(size int, index uint64) SnapFn {
|
||||||
|
return func(req *agentpb.SubscribeRequest, buf *EventBuffer) (uint64, error) {
|
||||||
|
for i := 0; i < size; i++ {
|
||||||
|
// Event content is arbitrary we are just using Health because it's the
|
||||||
|
// first type defined. We just want a set of things with consecutive
|
||||||
|
// names.
|
||||||
|
buf.Append([]agentpb.Event{testHealthEvent(index, i)})
|
||||||
|
}
|
||||||
|
return index, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testHealthEvent(index uint64, n int) agentpb.Event {
|
||||||
|
return agentpb.Event{
|
||||||
|
Index: index,
|
||||||
|
Topic: agentpb.Topic_ServiceHealth,
|
||||||
|
Payload: &agentpb.Event_ServiceHealth{
|
||||||
|
ServiceHealth: &agentpb.ServiceHealthUpdate{
|
||||||
|
Op: agentpb.CatalogOp_Register,
|
||||||
|
CheckServiceNode: &agentpb.CheckServiceNode{
|
||||||
|
Node: &agentpb.Node{
|
||||||
|
Node: "n1",
|
||||||
|
Address: "10.10.10.10",
|
||||||
|
},
|
||||||
|
Service: &agentpb.NodeService{
|
||||||
|
ID: fmt.Sprintf("test-%03d", n),
|
||||||
|
Service: "test",
|
||||||
|
Port: 8080,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,128 @@
|
||||||
|
package stream
|
||||||
|
|
||||||
|
import (
|
||||||
|
context "context"
|
||||||
|
"errors"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/agentpb"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// SubscriptionStateOpen is the default state of a subscription
|
||||||
|
SubscriptionStateOpen uint32 = 0
|
||||||
|
|
||||||
|
// SubscriptionStateCloseReload signals that the subscription was closed by
|
||||||
|
// server and client should retry.
|
||||||
|
SubscriptionStateCloseReload uint32 = 1
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ErrSubscriptionReload is a error signalling reload event should be sent to
|
||||||
|
// the client and the server should close.
|
||||||
|
ErrSubscriptionReload = errors.New("subscription closed by server, client should retry")
|
||||||
|
)
|
||||||
|
|
||||||
|
// Subscription holds state about a single Subscribe call. Subscribe clients
|
||||||
|
// access their next event by calling Next(). This may initially include the
|
||||||
|
// snapshot events to catch them up if they are new or behind.
|
||||||
|
type Subscription struct {
|
||||||
|
// state is accessed atomically 0 means open, 1 means closed with reload
|
||||||
|
state uint32
|
||||||
|
|
||||||
|
// req is the requests that we are responding to
|
||||||
|
req *agentpb.SubscribeRequest
|
||||||
|
|
||||||
|
// currentItem stores the current snapshot or topic buffer item we are on. It
|
||||||
|
// is mutated by calls to Next.
|
||||||
|
currentItem *BufferItem
|
||||||
|
|
||||||
|
// ctx is the Subscription context that wraps the context of the streaming RPC
|
||||||
|
// handler call.
|
||||||
|
ctx context.Context
|
||||||
|
|
||||||
|
// cancelFn stores the context cancel function that will wake up the
|
||||||
|
// in-progress Next call on a server-initiated state change e.g. Reload.
|
||||||
|
cancelFn func()
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewSubscription return a new subscription.
|
||||||
|
func NewSubscription(ctx context.Context, req *agentpb.SubscribeRequest, item *BufferItem) *Subscription {
|
||||||
|
subCtx, cancel := context.WithCancel(ctx)
|
||||||
|
return &Subscription{
|
||||||
|
ctx: subCtx,
|
||||||
|
cancelFn: cancel,
|
||||||
|
req: req,
|
||||||
|
currentItem: item,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next returns the next set of events to deliver. It must only be called from a
|
||||||
|
// single goroutine concurrently as it mutates the Subscription.
|
||||||
|
func (s *Subscription) Next() ([]agentpb.Event, error) {
|
||||||
|
state := atomic.LoadUint32(&s.state)
|
||||||
|
if state == SubscriptionStateCloseReload {
|
||||||
|
return nil, ErrSubscriptionReload
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
next, err := s.currentItem.Next(s.ctx)
|
||||||
|
if err != nil {
|
||||||
|
// Check we didn't return because of a state change cancelling the context
|
||||||
|
state := atomic.LoadUint32(&s.state)
|
||||||
|
if state == SubscriptionStateCloseReload {
|
||||||
|
return nil, ErrSubscriptionReload
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Advance our cursor for next loop or next call
|
||||||
|
s.currentItem = next
|
||||||
|
|
||||||
|
// Assume happy path where all events (or none) are relevant.
|
||||||
|
allMatch := true
|
||||||
|
|
||||||
|
// If there is a specific key, see if we need to filter any events
|
||||||
|
if s.req.Key != "" {
|
||||||
|
for _, e := range next.Events {
|
||||||
|
if s.req.Key != e.Key {
|
||||||
|
allMatch = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only if we need to filter events should we bother allocating a new slice
|
||||||
|
// as this is a hot loop.
|
||||||
|
events := next.Events
|
||||||
|
if !allMatch {
|
||||||
|
events = make([]agentpb.Event, 0, len(next.Events))
|
||||||
|
for _, e := range next.Events {
|
||||||
|
// Only return it if the key matches.
|
||||||
|
if s.req.Key == "" || s.req.Key == e.Key {
|
||||||
|
events = append(events, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(events) > 0 {
|
||||||
|
return events, nil
|
||||||
|
}
|
||||||
|
// Keep looping until we find some events we are interested in.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CloseReload closes the stream and signals that the subscriber should reload.
|
||||||
|
// It is safe to call from any goroutine.
|
||||||
|
func (s *Subscription) CloseReload() {
|
||||||
|
swapped := atomic.CompareAndSwapUint32(&s.state, SubscriptionStateOpen,
|
||||||
|
SubscriptionStateCloseReload)
|
||||||
|
|
||||||
|
if swapped {
|
||||||
|
s.cancelFn()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Request returns the request object that started the subscription.
|
||||||
|
func (s *Subscription) Request() *agentpb.SubscribeRequest {
|
||||||
|
return s.req
|
||||||
|
}
|
|
@ -0,0 +1,152 @@
|
||||||
|
package stream
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
time "time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/agentpb"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSubscription(t *testing.T) {
|
||||||
|
eb := NewEventBuffer()
|
||||||
|
|
||||||
|
index := uint64(100)
|
||||||
|
|
||||||
|
startHead := eb.Head()
|
||||||
|
|
||||||
|
// Start with an event in the buffer
|
||||||
|
testPublish(index, eb, "test")
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Create a subscription
|
||||||
|
req := &agentpb.SubscribeRequest{
|
||||||
|
Topic: agentpb.Topic_ServiceHealth,
|
||||||
|
Key: "test",
|
||||||
|
}
|
||||||
|
sub := NewSubscription(ctx, req, startHead)
|
||||||
|
|
||||||
|
// First call to sub.Next should return our published event immediately
|
||||||
|
start := time.Now()
|
||||||
|
got, err := sub.Next()
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, elapsed < 200*time.Millisecond,
|
||||||
|
"Event should have been delivered immediately, took %s", elapsed)
|
||||||
|
require.Len(t, got, 1)
|
||||||
|
require.Equal(t, index, got[0].Index)
|
||||||
|
|
||||||
|
// Schedule an event publish in a while
|
||||||
|
index++
|
||||||
|
start = time.Now()
|
||||||
|
time.AfterFunc(200*time.Millisecond, func() {
|
||||||
|
testPublish(index, eb, "test")
|
||||||
|
})
|
||||||
|
|
||||||
|
// Next call should block until event is delivered
|
||||||
|
got, err = sub.Next()
|
||||||
|
elapsed = time.Since(start)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, elapsed > 200*time.Millisecond,
|
||||||
|
"Event should have been delivered after blocking 200ms, took %s", elapsed)
|
||||||
|
require.True(t, elapsed < 2*time.Second,
|
||||||
|
"Event should have been delivered after short time, took %s", elapsed)
|
||||||
|
require.Len(t, got, 1)
|
||||||
|
require.Equal(t, index, got[0].Index)
|
||||||
|
|
||||||
|
// Event with wrong key should not be delivered. Deliver a good message right
|
||||||
|
// so we don't have to block test thread forever or cancel func yet.
|
||||||
|
index++
|
||||||
|
testPublish(index, eb, "nope")
|
||||||
|
index++
|
||||||
|
testPublish(index, eb, "test")
|
||||||
|
|
||||||
|
start = time.Now()
|
||||||
|
got, err = sub.Next()
|
||||||
|
elapsed = time.Since(start)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, elapsed < 200*time.Millisecond,
|
||||||
|
"Event should have been delivered immediately, took %s", elapsed)
|
||||||
|
require.Len(t, got, 1)
|
||||||
|
require.Equal(t, index, got[0].Index)
|
||||||
|
require.Equal(t, "test", got[0].Key)
|
||||||
|
|
||||||
|
// Cancelling the subscription context should unblock Next
|
||||||
|
start = time.Now()
|
||||||
|
time.AfterFunc(200*time.Millisecond, func() {
|
||||||
|
cancel()
|
||||||
|
})
|
||||||
|
|
||||||
|
_, err = sub.Next()
|
||||||
|
elapsed = time.Since(start)
|
||||||
|
require.Error(t, err)
|
||||||
|
require.True(t, elapsed > 200*time.Millisecond,
|
||||||
|
"Event should have been delivered after blocking 200ms, took %s", elapsed)
|
||||||
|
require.True(t, elapsed < 2*time.Second,
|
||||||
|
"Event should have been delivered after short time, took %s", elapsed)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSubscriptionCloseReload(t *testing.T) {
|
||||||
|
eb := NewEventBuffer()
|
||||||
|
|
||||||
|
index := uint64(100)
|
||||||
|
|
||||||
|
startHead := eb.Head()
|
||||||
|
|
||||||
|
// Start with an event in the buffer
|
||||||
|
testPublish(index, eb, "test")
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Create a subscription
|
||||||
|
req := &agentpb.SubscribeRequest{
|
||||||
|
Topic: agentpb.Topic_ServiceHealth,
|
||||||
|
Key: "test",
|
||||||
|
}
|
||||||
|
sub := NewSubscription(ctx, req, startHead)
|
||||||
|
|
||||||
|
// First call to sub.Next should return our published event immediately
|
||||||
|
start := time.Now()
|
||||||
|
got, err := sub.Next()
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, elapsed < 200*time.Millisecond,
|
||||||
|
"Event should have been delivered immediately, took %s", elapsed)
|
||||||
|
require.Len(t, got, 1)
|
||||||
|
require.Equal(t, index, got[0].Index)
|
||||||
|
|
||||||
|
// Schedule a CloseReload simulating the server deciding this subscroption
|
||||||
|
// needs to reset (e.g. on ACL perm change).
|
||||||
|
start = time.Now()
|
||||||
|
time.AfterFunc(200*time.Millisecond, func() {
|
||||||
|
sub.CloseReload()
|
||||||
|
})
|
||||||
|
|
||||||
|
_, err = sub.Next()
|
||||||
|
elapsed = time.Since(start)
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Equal(t, ErrSubscriptionReload, err)
|
||||||
|
require.True(t, elapsed > 200*time.Millisecond,
|
||||||
|
"Reload should have happened after blocking 200ms, took %s", elapsed)
|
||||||
|
require.True(t, elapsed < 2*time.Second,
|
||||||
|
"Reload should have been delivered after short time, took %s", elapsed)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testPublish(index uint64, b *EventBuffer, key string) {
|
||||||
|
// Don't care about the event payload for now just the semantics of publising
|
||||||
|
// something. This is not a valid stream in the end-to-end streaming protocol
|
||||||
|
// but enough to test subscription mechanics.
|
||||||
|
e := agentpb.Event{
|
||||||
|
Index: index,
|
||||||
|
Topic: agentpb.Topic_ServiceHealth,
|
||||||
|
Key: key,
|
||||||
|
Payload: &agentpb.Event_EndOfSnapshot{
|
||||||
|
EndOfSnapshot: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
b.Append([]agentpb.Event{e})
|
||||||
|
}
|
Loading…
Reference in New Issue