mirror of https://github.com/status-im/consul.git
Merge pull request #9114 from hashicorp/dnephin/filtering-in-stream
stream: improve naming of Payload methods
This commit is contained in:
parent
5521be526b
commit
9b904de406
|
@ -3,6 +3,7 @@ package state
|
||||||
import (
|
import (
|
||||||
memdb "github.com/hashicorp/go-memdb"
|
memdb "github.com/hashicorp/go-memdb"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/consul/stream"
|
"github.com/hashicorp/consul/agent/consul/stream"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||||
|
@ -10,6 +11,10 @@ import (
|
||||||
|
|
||||||
// EventPayloadCheckServiceNode is used as the Payload for a stream.Event to
|
// EventPayloadCheckServiceNode is used as the Payload for a stream.Event to
|
||||||
// indicates changes to a CheckServiceNode for service health.
|
// indicates changes to a CheckServiceNode for service health.
|
||||||
|
//
|
||||||
|
// The stream.Payload methods implemented by EventPayloadCheckServiceNode are
|
||||||
|
// do not mutate the payload, making it safe to use in an Event sent to
|
||||||
|
// stream.EventPublisher.Publish.
|
||||||
type EventPayloadCheckServiceNode struct {
|
type EventPayloadCheckServiceNode struct {
|
||||||
Op pbsubscribe.CatalogOp
|
Op pbsubscribe.CatalogOp
|
||||||
Value *structs.CheckServiceNode
|
Value *structs.CheckServiceNode
|
||||||
|
@ -19,7 +24,11 @@ type EventPayloadCheckServiceNode struct {
|
||||||
key string
|
key string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e EventPayloadCheckServiceNode) FilterByKey(key, namespace string) bool {
|
func (e EventPayloadCheckServiceNode) HasReadPermission(authz acl.Authorizer) bool {
|
||||||
|
return e.Value.CanRead(authz) == acl.Allow
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace string) bool {
|
||||||
if key == "" && namespace == "" {
|
if key == "" && namespace == "" {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
@ -1476,7 +1476,7 @@ func TestEventPayloadCheckServiceNode_FilterByKey(t *testing.T) {
|
||||||
t.Skip("cant test namespace matching without namespace support")
|
t.Skip("cant test namespace matching without namespace support")
|
||||||
}
|
}
|
||||||
|
|
||||||
require.Equal(t, tc.expected, tc.payload.FilterByKey(tc.key, tc.namespace))
|
require.Equal(t, tc.expected, tc.payload.MatchesKey(tc.key, tc.namespace))
|
||||||
}
|
}
|
||||||
|
|
||||||
var testCases = []testCase{
|
var testCases = []testCase{
|
||||||
|
|
|
@ -410,10 +410,14 @@ type nodePayload struct {
|
||||||
node *structs.ServiceNode
|
node *structs.ServiceNode
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p nodePayload) FilterByKey(key, _ string) bool {
|
func (p nodePayload) MatchesKey(key, _ string) bool {
|
||||||
return p.key == key
|
return p.key == key
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p nodePayload) HasReadPermission(acl.Authorizer) bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken {
|
func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken {
|
||||||
token := &structs.ACLToken{
|
token := &structs.ACLToken{
|
||||||
AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4",
|
AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4",
|
||||||
|
|
|
@ -4,7 +4,11 @@ to the state store.
|
||||||
*/
|
*/
|
||||||
package stream
|
package stream
|
||||||
|
|
||||||
import "fmt"
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
|
)
|
||||||
|
|
||||||
// Topic is an identifier that partitions events. A subscription will only receive
|
// Topic is an identifier that partitions events. A subscription will only receive
|
||||||
// events which match the Topic.
|
// events which match the Topic.
|
||||||
|
@ -18,72 +22,81 @@ type Event struct {
|
||||||
Payload Payload
|
Payload Payload
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// A Payload contains the topic-specific data in an event. The payload methods
|
||||||
|
// should not modify the state of the payload if the Event is being submitted to
|
||||||
|
// EventPublisher.Publish.
|
||||||
type Payload interface {
|
type Payload interface {
|
||||||
// FilterByKey must return true if the Payload should be included in a subscription
|
// MatchesKey must return true if the Payload should be included in a subscription
|
||||||
// requested with the key and namespace.
|
// requested with the key and namespace.
|
||||||
// Generally this means that the payload matches the key and namespace or
|
// Generally this means that the payload matches the key and namespace or
|
||||||
// the payload is a special framing event that should be returned to every
|
// the payload is a special framing event that should be returned to every
|
||||||
// subscription.
|
// subscription.
|
||||||
FilterByKey(key, namespace string) bool
|
MatchesKey(key, namespace string) bool
|
||||||
|
|
||||||
|
// HasReadPermission uses the acl.Authorizer to determine if the items in the
|
||||||
|
// Payload are visible to the request. It returns true if the payload is
|
||||||
|
// authorized for Read, otherwise returns false.
|
||||||
|
HasReadPermission(authz acl.Authorizer) bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Len returns the number of events contained within this event. If the Payload
|
// PayloadEvents is a Payload that may be returned by Subscription.Next when
|
||||||
// is a []Event, the length of that slice is returned. Otherwise 1 is returned.
|
// there are multiple events at an index.
|
||||||
func (e Event) Len() int {
|
//
|
||||||
if batch, ok := e.Payload.(PayloadEvents); ok {
|
// Note that unlike most other Payload, PayloadEvents is mutable and it is NOT
|
||||||
return len(batch)
|
// safe to send to EventPublisher.Publish.
|
||||||
}
|
type PayloadEvents struct {
|
||||||
return 1
|
Items []Event
|
||||||
}
|
}
|
||||||
|
|
||||||
// Filter returns an Event filtered to only those Events where f returns true.
|
func newPayloadEvents(items ...Event) *PayloadEvents {
|
||||||
// If the second return value is false, every Event was removed by the filter.
|
return &PayloadEvents{Items: items}
|
||||||
func (e Event) Filter(f func(Event) bool) (Event, bool) {
|
|
||||||
batch, ok := e.Payload.(PayloadEvents)
|
|
||||||
if !ok {
|
|
||||||
return e, f(e)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *PayloadEvents) filter(f func(Event) bool) bool {
|
||||||
|
items := p.Items
|
||||||
|
|
||||||
// To avoid extra allocations, iterate over the list of events first and
|
// To avoid extra allocations, iterate over the list of events first and
|
||||||
// get a count of the total desired size. This trades off some extra cpu
|
// get a count of the total desired size. This trades off some extra cpu
|
||||||
// time in the worse case (when not all items match the filter), for
|
// time in the worse case (when not all items match the filter), for
|
||||||
// fewer memory allocations.
|
// fewer memory allocations.
|
||||||
var size int
|
var size int
|
||||||
for idx := range batch {
|
for idx := range items {
|
||||||
if f(batch[idx]) {
|
if f(items[idx]) {
|
||||||
size++
|
size++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(batch) == size || size == 0 {
|
if len(items) == size || size == 0 {
|
||||||
return e, size != 0
|
return size != 0
|
||||||
}
|
}
|
||||||
|
|
||||||
filtered := make(PayloadEvents, 0, size)
|
filtered := make([]Event, 0, size)
|
||||||
for idx := range batch {
|
for idx := range items {
|
||||||
event := batch[idx]
|
event := items[idx]
|
||||||
if f(event) {
|
if f(event) {
|
||||||
filtered = append(filtered, event)
|
filtered = append(filtered, event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(filtered) == 0 {
|
p.Items = filtered
|
||||||
return e, false
|
|
||||||
}
|
|
||||||
e.Payload = filtered
|
|
||||||
return e, true
|
|
||||||
}
|
|
||||||
|
|
||||||
// PayloadEvents is an Payload which contains multiple Events.
|
|
||||||
type PayloadEvents []Event
|
|
||||||
|
|
||||||
// TODO: this method is not called, but needs to exist so that we can store
|
|
||||||
// a slice of events as a payload. In the future we should be able to refactor
|
|
||||||
// Event.Filter so that this FilterByKey includes the re-slicing.
|
|
||||||
func (e PayloadEvents) FilterByKey(_, _ string) bool {
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e PayloadEvents) Events() []Event {
|
// MatchesKey filters the PayloadEvents to those which match the key and namespace.
|
||||||
return e
|
func (p *PayloadEvents) MatchesKey(key, namespace string) bool {
|
||||||
|
return p.filter(func(event Event) bool {
|
||||||
|
return event.Payload.MatchesKey(key, namespace)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PayloadEvents) Len() int {
|
||||||
|
return len(p.Items)
|
||||||
|
}
|
||||||
|
|
||||||
|
// HasReadPermission filters the PayloadEvents to those which are authorized
|
||||||
|
// for reading by authz.
|
||||||
|
func (p *PayloadEvents) HasReadPermission(authz acl.Authorizer) bool {
|
||||||
|
return p.filter(func(event Event) bool {
|
||||||
|
return event.Payload.HasReadPermission(authz)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsEndOfSnapshot returns true if this is a framing event that indicates the
|
// IsEndOfSnapshot returns true if this is a framing event that indicates the
|
||||||
|
@ -100,24 +113,34 @@ func (e Event) IsNewSnapshotToFollow() bool {
|
||||||
return e.Payload == newSnapshotToFollow{}
|
return e.Payload == newSnapshotToFollow{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type endOfSnapshot struct{}
|
type framingEvent struct{}
|
||||||
|
|
||||||
func (endOfSnapshot) FilterByKey(string, string) bool {
|
func (framingEvent) MatchesKey(string, string) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
type newSnapshotToFollow struct{}
|
func (framingEvent) HasReadPermission(acl.Authorizer) bool {
|
||||||
|
|
||||||
func (newSnapshotToFollow) FilterByKey(string, string) bool {
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type endOfSnapshot struct {
|
||||||
|
framingEvent
|
||||||
|
}
|
||||||
|
|
||||||
|
type newSnapshotToFollow struct {
|
||||||
|
framingEvent
|
||||||
|
}
|
||||||
|
|
||||||
type closeSubscriptionPayload struct {
|
type closeSubscriptionPayload struct {
|
||||||
tokensSecretIDs []string
|
tokensSecretIDs []string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (closeSubscriptionPayload) FilterByKey(string, string) bool {
|
func (closeSubscriptionPayload) MatchesKey(string, string) bool {
|
||||||
return true
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (closeSubscriptionPayload) HasReadPermission(acl.Authorizer) bool {
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewCloseSubscriptionEvent returns a special Event that is handled by the
|
// NewCloseSubscriptionEvent returns a special Event that is handled by the
|
||||||
|
|
|
@ -91,7 +91,8 @@ func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *E
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
// Publish events to all subscribers of the event Topic.
|
// Publish events to all subscribers of the event Topic. The events will be shared
|
||||||
|
// with all subscriptions, so the Payload used in Event.Payload must be immutable.
|
||||||
func (e *EventPublisher) Publish(events []Event) {
|
func (e *EventPublisher) Publish(events []Event) {
|
||||||
if len(events) > 0 {
|
if len(events) > 0 {
|
||||||
e.publishCh <- events
|
e.publishCh <- events
|
||||||
|
|
|
@ -7,6 +7,8 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
)
|
)
|
||||||
|
|
||||||
type intTopic int
|
type intTopic int
|
||||||
|
@ -65,15 +67,20 @@ var testSnapshotEvent = Event{
|
||||||
type simplePayload struct {
|
type simplePayload struct {
|
||||||
key string
|
key string
|
||||||
value string
|
value string
|
||||||
|
noReadPerm bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p simplePayload) FilterByKey(key, _ string) bool {
|
func (p simplePayload) MatchesKey(key, _ string) bool {
|
||||||
if key == "" {
|
if key == "" {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return p.key == key
|
return p.key == key
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p simplePayload) HasReadPermission(acl.Authorizer) bool {
|
||||||
|
return !p.noReadPerm
|
||||||
|
}
|
||||||
|
|
||||||
func newTestSnapshotHandlers() SnapshotHandlers {
|
func newTestSnapshotHandlers() SnapshotHandlers {
|
||||||
return SnapshotHandlers{
|
return SnapshotHandlers{
|
||||||
testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) {
|
testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) {
|
||||||
|
|
|
@ -15,3 +15,163 @@ func TestEvent_IsEndOfSnapshot(t *testing.T) {
|
||||||
require.False(t, e.IsEndOfSnapshot())
|
require.False(t, e.IsEndOfSnapshot())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newSimpleEvent(key string, index uint64) Event {
|
||||||
|
return Event{Index: index, Payload: simplePayload{key: key}}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPayloadEvents_FilterByKey(t *testing.T) {
|
||||||
|
type testCase struct {
|
||||||
|
name string
|
||||||
|
req SubscribeRequest
|
||||||
|
events []Event
|
||||||
|
expectEvent bool
|
||||||
|
expected *PayloadEvents
|
||||||
|
expectedCap int
|
||||||
|
}
|
||||||
|
|
||||||
|
fn := func(t *testing.T, tc testCase) {
|
||||||
|
events := make([]Event, 0, 5)
|
||||||
|
events = append(events, tc.events...)
|
||||||
|
|
||||||
|
pe := &PayloadEvents{Items: events}
|
||||||
|
ok := pe.MatchesKey(tc.req.Key, tc.req.Namespace)
|
||||||
|
require.Equal(t, tc.expectEvent, ok)
|
||||||
|
if !tc.expectEvent {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, tc.expected, pe)
|
||||||
|
// test if there was a new array allocated or not
|
||||||
|
require.Equal(t, tc.expectedCap, cap(pe.Items))
|
||||||
|
}
|
||||||
|
|
||||||
|
var testCases = []testCase{
|
||||||
|
{
|
||||||
|
name: "all events match, no key or namespace",
|
||||||
|
req: SubscribeRequest{Topic: testTopic},
|
||||||
|
events: []Event{
|
||||||
|
newSimpleEvent("One", 102),
|
||||||
|
newSimpleEvent("Two", 102)},
|
||||||
|
expectEvent: true,
|
||||||
|
expected: newPayloadEvents(
|
||||||
|
newSimpleEvent("One", 102),
|
||||||
|
newSimpleEvent("Two", 102)),
|
||||||
|
expectedCap: 5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "all events match, no namespace",
|
||||||
|
req: SubscribeRequest{Topic: testTopic, Key: "Same"},
|
||||||
|
events: []Event{
|
||||||
|
newSimpleEvent("Same", 103),
|
||||||
|
newSimpleEvent("Same", 103)},
|
||||||
|
expectEvent: true,
|
||||||
|
expected: newPayloadEvents(
|
||||||
|
newSimpleEvent("Same", 103),
|
||||||
|
newSimpleEvent("Same", 103)),
|
||||||
|
expectedCap: 5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "all events match, no key",
|
||||||
|
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
|
||||||
|
events: []Event{
|
||||||
|
newNSEvent("Something", "apps"),
|
||||||
|
newNSEvent("Other", "apps")},
|
||||||
|
expectEvent: true,
|
||||||
|
expected: newPayloadEvents(
|
||||||
|
newNSEvent("Something", "apps"),
|
||||||
|
newNSEvent("Other", "apps")),
|
||||||
|
expectedCap: 5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some evens match, no namespace",
|
||||||
|
req: SubscribeRequest{Topic: testTopic, Key: "Same"},
|
||||||
|
events: []Event{
|
||||||
|
newSimpleEvent("Same", 104),
|
||||||
|
newSimpleEvent("Other", 104),
|
||||||
|
newSimpleEvent("Same", 104)},
|
||||||
|
expectEvent: true,
|
||||||
|
expected: newPayloadEvents(
|
||||||
|
newSimpleEvent("Same", 104),
|
||||||
|
newSimpleEvent("Same", 104)),
|
||||||
|
expectedCap: 2,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some events match, no key",
|
||||||
|
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
|
||||||
|
events: []Event{
|
||||||
|
newNSEvent("app1", "apps"),
|
||||||
|
newNSEvent("db1", "dbs"),
|
||||||
|
newNSEvent("app2", "apps")},
|
||||||
|
expectEvent: true,
|
||||||
|
expected: newPayloadEvents(
|
||||||
|
newNSEvent("app1", "apps"),
|
||||||
|
newNSEvent("app2", "apps")),
|
||||||
|
expectedCap: 2,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no events match key",
|
||||||
|
req: SubscribeRequest{Topic: testTopic, Key: "Other"},
|
||||||
|
events: []Event{
|
||||||
|
newSimpleEvent("Same", 0),
|
||||||
|
newSimpleEvent("Same", 0)},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no events match namespace",
|
||||||
|
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
|
||||||
|
events: []Event{
|
||||||
|
newNSEvent("app1", "group1"),
|
||||||
|
newNSEvent("app2", "group2")},
|
||||||
|
expectEvent: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
fn(t, tc)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newNSEvent(key, namespace string) Event {
|
||||||
|
return Event{Index: 22, Payload: nsPayload{key: key, namespace: namespace}}
|
||||||
|
}
|
||||||
|
|
||||||
|
type nsPayload struct {
|
||||||
|
framingEvent
|
||||||
|
key string
|
||||||
|
namespace string
|
||||||
|
value string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p nsPayload) MatchesKey(key, namespace string) bool {
|
||||||
|
return (key == "" || key == p.key) && (namespace == "" || namespace == p.namespace)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPayloadEvents_HasReadPermission(t *testing.T) {
|
||||||
|
t.Run("some events filtered", func(t *testing.T) {
|
||||||
|
ep := newPayloadEvents(
|
||||||
|
Event{Payload: simplePayload{key: "one", noReadPerm: true}},
|
||||||
|
Event{Payload: simplePayload{key: "two", noReadPerm: false}},
|
||||||
|
Event{Payload: simplePayload{key: "three", noReadPerm: true}},
|
||||||
|
Event{Payload: simplePayload{key: "four", noReadPerm: false}})
|
||||||
|
|
||||||
|
require.True(t, ep.HasReadPermission(nil))
|
||||||
|
expected := []Event{
|
||||||
|
{Payload: simplePayload{key: "two"}},
|
||||||
|
{Payload: simplePayload{key: "four"}},
|
||||||
|
}
|
||||||
|
require.Equal(t, expected, ep.Items)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("all events filtered", func(t *testing.T) {
|
||||||
|
ep := newPayloadEvents(
|
||||||
|
Event{Payload: simplePayload{key: "one", noReadPerm: true}},
|
||||||
|
Event{Payload: simplePayload{key: "two", noReadPerm: true}},
|
||||||
|
Event{Payload: simplePayload{key: "three", noReadPerm: true}},
|
||||||
|
Event{Payload: simplePayload{key: "four", noReadPerm: true}})
|
||||||
|
|
||||||
|
require.False(t, ep.HasReadPermission(nil))
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -101,8 +101,8 @@ func (s *Subscription) Next(ctx context.Context) (Event, error) {
|
||||||
if len(next.Events) == 0 {
|
if len(next.Events) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
event, ok := filterByKey(s.req, next.Events)
|
event := newEventFromBatch(s.req, next.Events)
|
||||||
if !ok {
|
if !event.Payload.MatchesKey(s.req.Key, s.req.Namespace) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return event, nil
|
return event, nil
|
||||||
|
@ -128,22 +128,10 @@ func newEventFromBatch(req SubscribeRequest, events []Event) Event {
|
||||||
return Event{
|
return Event{
|
||||||
Topic: req.Topic,
|
Topic: req.Topic,
|
||||||
Index: first.Index,
|
Index: first.Index,
|
||||||
Payload: PayloadEvents(events),
|
Payload: newPayloadEvents(events...),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func filterByKey(req SubscribeRequest, events []Event) (Event, bool) {
|
|
||||||
event := newEventFromBatch(req, events)
|
|
||||||
if req.Key == "" && req.Namespace == "" {
|
|
||||||
return event, true
|
|
||||||
}
|
|
||||||
|
|
||||||
fn := func(e Event) bool {
|
|
||||||
return e.Payload.FilterByKey(req.Key, req.Namespace)
|
|
||||||
}
|
|
||||||
return event.Filter(fn)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close the subscription. Subscribers will receive an error when they call Next,
|
// Close the subscription. Subscribers will receive an error when they call Next,
|
||||||
// and will need to perform a new Subscribe request.
|
// and will need to perform a new Subscribe request.
|
||||||
// It is safe to call from any goroutine.
|
// It is safe to call from any goroutine.
|
||||||
|
|
|
@ -138,147 +138,29 @@ func publishTestEvent(index uint64, b *eventBuffer, key string) {
|
||||||
b.Append([]Event{e})
|
b.Append([]Event{e})
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSimpleEvent(key string, index uint64) Event {
|
func TestNewEventsFromBatch(t *testing.T) {
|
||||||
return Event{Index: index, Payload: simplePayload{key: key}}
|
t.Run("single item", func(t *testing.T) {
|
||||||
}
|
first := Event{
|
||||||
|
|
||||||
func TestFilterByKey(t *testing.T) {
|
|
||||||
type testCase struct {
|
|
||||||
name string
|
|
||||||
req SubscribeRequest
|
|
||||||
events []Event
|
|
||||||
expectEvent bool
|
|
||||||
expected Event
|
|
||||||
expectedCap int
|
|
||||||
}
|
|
||||||
|
|
||||||
fn := func(t *testing.T, tc testCase) {
|
|
||||||
events := make(PayloadEvents, 0, 5)
|
|
||||||
events = append(events, tc.events...)
|
|
||||||
|
|
||||||
actual, ok := filterByKey(tc.req, events)
|
|
||||||
require.Equal(t, tc.expectEvent, ok)
|
|
||||||
if !tc.expectEvent {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
require.Equal(t, tc.expected, actual)
|
|
||||||
// test if there was a new array allocated or not
|
|
||||||
require.Equal(t, tc.expectedCap, cap(actual.Payload.(PayloadEvents)))
|
|
||||||
}
|
|
||||||
|
|
||||||
var testCases = []testCase{
|
|
||||||
{
|
|
||||||
name: "all events match, no key or namespace",
|
|
||||||
req: SubscribeRequest{Topic: testTopic},
|
|
||||||
events: []Event{
|
|
||||||
newSimpleEvent("One", 102),
|
|
||||||
newSimpleEvent("Two", 102)},
|
|
||||||
expectEvent: true,
|
|
||||||
expected: Event{
|
|
||||||
Topic: testTopic,
|
Topic: testTopic,
|
||||||
Index: 102,
|
Index: 1234,
|
||||||
Payload: PayloadEvents{
|
Payload: simplePayload{key: "key"},
|
||||||
newSimpleEvent("One", 102),
|
|
||||||
newSimpleEvent("Two", 102)}},
|
|
||||||
expectedCap: 5,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "all events match, no namespace",
|
|
||||||
req: SubscribeRequest{Topic: testTopic, Key: "Same"},
|
|
||||||
events: []Event{
|
|
||||||
newSimpleEvent("Same", 103),
|
|
||||||
newSimpleEvent("Same", 103)},
|
|
||||||
expectEvent: true,
|
|
||||||
expected: Event{
|
|
||||||
Topic: testTopic,
|
|
||||||
Index: 103,
|
|
||||||
Payload: PayloadEvents{
|
|
||||||
newSimpleEvent("Same", 103),
|
|
||||||
newSimpleEvent("Same", 103)}},
|
|
||||||
expectedCap: 5,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "all events match, no key",
|
|
||||||
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
|
|
||||||
events: []Event{
|
|
||||||
newNSEvent("Something", "apps"),
|
|
||||||
newNSEvent("Other", "apps")},
|
|
||||||
expectEvent: true,
|
|
||||||
expected: Event{
|
|
||||||
Topic: testTopic,
|
|
||||||
Index: 22,
|
|
||||||
Payload: PayloadEvents{
|
|
||||||
newNSEvent("Something", "apps"),
|
|
||||||
newNSEvent("Other", "apps")}},
|
|
||||||
expectedCap: 5,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "some evens match, no namespace",
|
|
||||||
req: SubscribeRequest{Topic: testTopic, Key: "Same"},
|
|
||||||
events: []Event{
|
|
||||||
newSimpleEvent("Same", 104),
|
|
||||||
newSimpleEvent("Other", 104),
|
|
||||||
newSimpleEvent("Same", 104)},
|
|
||||||
expectEvent: true,
|
|
||||||
expected: Event{
|
|
||||||
Topic: testTopic,
|
|
||||||
Index: 104,
|
|
||||||
Payload: PayloadEvents{
|
|
||||||
newSimpleEvent("Same", 104),
|
|
||||||
newSimpleEvent("Same", 104)}},
|
|
||||||
expectedCap: 2,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "some events match, no key",
|
|
||||||
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
|
|
||||||
events: []Event{
|
|
||||||
newNSEvent("app1", "apps"),
|
|
||||||
newNSEvent("db1", "dbs"),
|
|
||||||
newNSEvent("app2", "apps")},
|
|
||||||
expectEvent: true,
|
|
||||||
expected: Event{
|
|
||||||
Topic: testTopic,
|
|
||||||
Index: 22,
|
|
||||||
Payload: PayloadEvents{
|
|
||||||
newNSEvent("app1", "apps"),
|
|
||||||
newNSEvent("app2", "apps")}},
|
|
||||||
expectedCap: 2,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "no events match key",
|
|
||||||
req: SubscribeRequest{Topic: testTopic, Key: "Other"},
|
|
||||||
events: []Event{
|
|
||||||
newSimpleEvent("Same", 0),
|
|
||||||
newSimpleEvent("Same", 0)},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "no events match namespace",
|
|
||||||
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
|
|
||||||
events: []Event{
|
|
||||||
newNSEvent("app1", "group1"),
|
|
||||||
newNSEvent("app2", "group2")},
|
|
||||||
expectEvent: false,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
e := newEventFromBatch(SubscribeRequest{}, []Event{first})
|
||||||
for _, tc := range testCases {
|
require.Equal(t, first, e)
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
})
|
||||||
fn(t, tc)
|
t.Run("many items", func(t *testing.T) {
|
||||||
|
events := []Event{
|
||||||
|
newSimpleEvent("foo", 9999),
|
||||||
|
newSimpleEvent("foo", 9999),
|
||||||
|
newSimpleEvent("zee", 9999),
|
||||||
|
}
|
||||||
|
req := SubscribeRequest{Topic: testTopic}
|
||||||
|
e := newEventFromBatch(req, events)
|
||||||
|
expected := Event{
|
||||||
|
Topic: testTopic,
|
||||||
|
Index: 9999,
|
||||||
|
Payload: newPayloadEvents(events...),
|
||||||
|
}
|
||||||
|
require.Equal(t, expected, e)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
func newNSEvent(key, namespace string) Event {
|
|
||||||
return Event{Index: 22, Payload: nsPayload{key: key, namespace: namespace}}
|
|
||||||
}
|
|
||||||
|
|
||||||
type nsPayload struct {
|
|
||||||
key string
|
|
||||||
namespace string
|
|
||||||
value string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p nsPayload) FilterByKey(key, namespace string) bool {
|
|
||||||
return (key == "" || key == p.key) && (namespace == "" || namespace == p.namespace)
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,22 +0,0 @@
|
||||||
package subscribe
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/hashicorp/consul/acl"
|
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
|
||||||
"github.com/hashicorp/consul/agent/consul/stream"
|
|
||||||
)
|
|
||||||
|
|
||||||
// EnforceACL takes an acl.Authorizer and returns the decision for whether the
|
|
||||||
// event is allowed to be sent to this client or not.
|
|
||||||
func enforceACL(authz acl.Authorizer, e stream.Event) acl.EnforcementDecision {
|
|
||||||
switch {
|
|
||||||
case e.IsEndOfSnapshot(), e.IsNewSnapshotToFollow():
|
|
||||||
return acl.Allow
|
|
||||||
}
|
|
||||||
|
|
||||||
switch p := e.Payload.(type) {
|
|
||||||
case state.EventPayloadCheckServiceNode:
|
|
||||||
return p.Value.CanRead(authz)
|
|
||||||
}
|
|
||||||
return acl.Deny
|
|
||||||
}
|
|
|
@ -58,12 +58,20 @@ func (l *eventLogger) Trace(e stream.Event) {
|
||||||
case e.IsEndOfSnapshot():
|
case e.IsEndOfSnapshot():
|
||||||
l.snapshotDone = true
|
l.snapshotDone = true
|
||||||
l.logger.Trace("snapshot complete", "index", e.Index, "sent", l.count)
|
l.logger.Trace("snapshot complete", "index", e.Index, "sent", l.count)
|
||||||
|
return
|
||||||
case e.IsNewSnapshotToFollow():
|
case e.IsNewSnapshotToFollow():
|
||||||
l.logger.Trace("starting new snapshot", "sent", l.count)
|
l.logger.Trace("starting new snapshot", "sent", l.count)
|
||||||
return
|
return
|
||||||
case l.snapshotDone:
|
|
||||||
l.logger.Trace("sending events", "index", e.Index, "sent", l.count, "batch_size", e.Len())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
l.count += uint64(e.Len())
|
size := 1
|
||||||
|
if l, ok := e.Payload.(length); ok {
|
||||||
|
size = l.Len()
|
||||||
|
}
|
||||||
|
l.logger.Trace("sending events", "index", e.Index, "sent", l.count, "batch_size", size)
|
||||||
|
l.count += uint64(size)
|
||||||
|
}
|
||||||
|
|
||||||
|
type length interface {
|
||||||
|
Len() int
|
||||||
}
|
}
|
||||||
|
|
|
@ -132,10 +132,8 @@ func filterByAuth(authz acl.Authorizer, event stream.Event) (stream.Event, bool)
|
||||||
if authz == nil {
|
if authz == nil {
|
||||||
return event, true
|
return event, true
|
||||||
}
|
}
|
||||||
fn := func(e stream.Event) bool {
|
|
||||||
return enforceACL(authz, e) == acl.Allow
|
return event, event.Payload.HasReadPermission(authz)
|
||||||
}
|
|
||||||
return event.Filter(fn)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newEventFromStreamEvent(event stream.Event) *pbsubscribe.Event {
|
func newEventFromStreamEvent(event stream.Event) *pbsubscribe.Event {
|
||||||
|
@ -154,10 +152,10 @@ func newEventFromStreamEvent(event stream.Event) *pbsubscribe.Event {
|
||||||
|
|
||||||
func setPayload(e *pbsubscribe.Event, payload stream.Payload) {
|
func setPayload(e *pbsubscribe.Event, payload stream.Payload) {
|
||||||
switch p := payload.(type) {
|
switch p := payload.(type) {
|
||||||
case stream.PayloadEvents:
|
case *stream.PayloadEvents:
|
||||||
e.Payload = &pbsubscribe.Event_EventBatch{
|
e.Payload = &pbsubscribe.Event_EventBatch{
|
||||||
EventBatch: &pbsubscribe.EventBatch{
|
EventBatch: &pbsubscribe.EventBatch{
|
||||||
Events: batchEventsFromEventSlice(p),
|
Events: batchEventsFromEventSlice(p.Items),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
case state.EventPayloadCheckServiceNode:
|
case state.EventPayloadCheckServiceNode:
|
||||||
|
|
|
@ -917,8 +917,8 @@ func TestNewEventFromSteamEvent(t *testing.T) {
|
||||||
name: "event batch",
|
name: "event batch",
|
||||||
event: stream.Event{
|
event: stream.Event{
|
||||||
Index: 2002,
|
Index: 2002,
|
||||||
Payload: stream.PayloadEvents{
|
Payload: newPayloadEvents(
|
||||||
{
|
stream.Event{
|
||||||
Index: 2002,
|
Index: 2002,
|
||||||
Payload: state.EventPayloadCheckServiceNode{
|
Payload: state.EventPayloadCheckServiceNode{
|
||||||
Op: pbsubscribe.CatalogOp_Register,
|
Op: pbsubscribe.CatalogOp_Register,
|
||||||
|
@ -928,7 +928,7 @@ func TestNewEventFromSteamEvent(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
stream.Event{
|
||||||
Index: 2002,
|
Index: 2002,
|
||||||
Payload: state.EventPayloadCheckServiceNode{
|
Payload: state.EventPayloadCheckServiceNode{
|
||||||
Op: pbsubscribe.CatalogOp_Deregister,
|
Op: pbsubscribe.CatalogOp_Deregister,
|
||||||
|
@ -937,8 +937,7 @@ func TestNewEventFromSteamEvent(t *testing.T) {
|
||||||
Service: &structs.NodeService{Service: "web1"},
|
Service: &structs.NodeService{Service: "web1"},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
}),
|
||||||
},
|
|
||||||
},
|
},
|
||||||
expected: pbsubscribe.Event{
|
expected: pbsubscribe.Event{
|
||||||
Index: 2002,
|
Index: 2002,
|
||||||
|
@ -1008,6 +1007,10 @@ func TestNewEventFromSteamEvent(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newPayloadEvents(items ...stream.Event) *stream.PayloadEvents {
|
||||||
|
return &stream.PayloadEvents{Items: items}
|
||||||
|
}
|
||||||
|
|
||||||
// newEventFromSubscription is used to return framing events. EndOfSnapshot and
|
// newEventFromSubscription is used to return framing events. EndOfSnapshot and
|
||||||
// NewSnapshotToFollow are not exported, but we can get them from a subscription.
|
// NewSnapshotToFollow are not exported, but we can get them from a subscription.
|
||||||
func newEventFromSubscription(t *testing.T, index uint64) stream.Event {
|
func newEventFromSubscription(t *testing.T, index uint64) stream.Event {
|
||||||
|
|
Loading…
Reference in New Issue