stream: document that Payload must be immutable

If they are sent to EventPublisher.Publish.

Also document that PayloadEvents is expected to come from a subscription and that it is
not immutable.
This commit is contained in:
Daniel Nephin 2020-11-06 13:00:33 -05:00
parent 43af0ba7a3
commit fb70c8bac2
7 changed files with 32 additions and 13 deletions

View File

@ -11,6 +11,10 @@ import (
// EventPayloadCheckServiceNode is used as the Payload for a stream.Event to
// indicates changes to a CheckServiceNode for service health.
//
// The stream.Payload methods implemented by EventPayloadCheckServiceNode are
// do not mutate the payload, making it safe to use in an Event sent to
// stream.EventPublisher.Publish.
type EventPayloadCheckServiceNode struct {
Op pbsubscribe.CatalogOp
Value *structs.CheckServiceNode

View File

@ -22,6 +22,9 @@ type Event struct {
Payload Payload
}
// A Payload contains the topic-specific data in an event. The payload methods
// should not modify the state of the payload if the Event is being submitted to
// EventPublisher.Publish.
type Payload interface {
// MatchesKey must return true if the Payload should be included in a subscription
// requested with the key and namespace.
@ -36,12 +39,16 @@ type Payload interface {
HasReadPermission(authz acl.Authorizer) bool
}
// PayloadEvents is an Payload which contains multiple Events.
// PayloadEvents is a Payload that may be returned by Subscription.Next when
// there are multiple events at an index.
//
// Note that unlike most other Payload, PayloadEvents is mutable and it is NOT
// safe to send to EventPublisher.Publish.
type PayloadEvents struct {
Items []Event
}
func NewPayloadEvents(items ...Event) *PayloadEvents {
func newPayloadEvents(items ...Event) *PayloadEvents {
return &PayloadEvents{Items: items}
}
@ -73,6 +80,7 @@ func (p *PayloadEvents) filter(f func(Event) bool) bool {
return true
}
// MatchesKey filters the PayloadEvents to those which match the key and namespace.
func (p *PayloadEvents) MatchesKey(key, namespace string) bool {
return p.filter(func(event Event) bool {
return event.Payload.MatchesKey(key, namespace)
@ -83,6 +91,8 @@ func (p *PayloadEvents) Len() int {
return len(p.Items)
}
// HasReadPermission filters the PayloadEvents to those which are authorized
// for reading by authz.
func (p *PayloadEvents) HasReadPermission(authz acl.Authorizer) bool {
return p.filter(func(event Event) bool {
return event.Payload.HasReadPermission(authz)

View File

@ -91,7 +91,8 @@ func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *E
return e
}
// Publish events to all subscribers of the event Topic.
// Publish events to all subscribers of the event Topic. The events will be shared
// with all subscriptions, so the Payload used in Event.Payload must be immutable.
func (e *EventPublisher) Publish(events []Event) {
if len(events) > 0 {
e.publishCh <- events

View File

@ -54,7 +54,7 @@ func TestPayloadEvents_FilterByKey(t *testing.T) {
newSimpleEvent("One", 102),
newSimpleEvent("Two", 102)},
expectEvent: true,
expected: NewPayloadEvents(
expected: newPayloadEvents(
newSimpleEvent("One", 102),
newSimpleEvent("Two", 102)),
expectedCap: 5,
@ -66,7 +66,7 @@ func TestPayloadEvents_FilterByKey(t *testing.T) {
newSimpleEvent("Same", 103),
newSimpleEvent("Same", 103)},
expectEvent: true,
expected: NewPayloadEvents(
expected: newPayloadEvents(
newSimpleEvent("Same", 103),
newSimpleEvent("Same", 103)),
expectedCap: 5,
@ -78,7 +78,7 @@ func TestPayloadEvents_FilterByKey(t *testing.T) {
newNSEvent("Something", "apps"),
newNSEvent("Other", "apps")},
expectEvent: true,
expected: NewPayloadEvents(
expected: newPayloadEvents(
newNSEvent("Something", "apps"),
newNSEvent("Other", "apps")),
expectedCap: 5,
@ -91,7 +91,7 @@ func TestPayloadEvents_FilterByKey(t *testing.T) {
newSimpleEvent("Other", 104),
newSimpleEvent("Same", 104)},
expectEvent: true,
expected: NewPayloadEvents(
expected: newPayloadEvents(
newSimpleEvent("Same", 104),
newSimpleEvent("Same", 104)),
expectedCap: 2,
@ -104,7 +104,7 @@ func TestPayloadEvents_FilterByKey(t *testing.T) {
newNSEvent("db1", "dbs"),
newNSEvent("app2", "apps")},
expectEvent: true,
expected: NewPayloadEvents(
expected: newPayloadEvents(
newNSEvent("app1", "apps"),
newNSEvent("app2", "apps")),
expectedCap: 2,
@ -150,7 +150,7 @@ func (p nsPayload) MatchesKey(key, namespace string) bool {
func TestPayloadEvents_HasReadPermission(t *testing.T) {
t.Run("some events filtered", func(t *testing.T) {
ep := NewPayloadEvents(
ep := newPayloadEvents(
Event{Payload: simplePayload{key: "one", noReadPerm: true}},
Event{Payload: simplePayload{key: "two", noReadPerm: false}},
Event{Payload: simplePayload{key: "three", noReadPerm: true}},
@ -165,7 +165,7 @@ func TestPayloadEvents_HasReadPermission(t *testing.T) {
})
t.Run("all events filtered", func(t *testing.T) {
ep := NewPayloadEvents(
ep := newPayloadEvents(
Event{Payload: simplePayload{key: "one", noReadPerm: true}},
Event{Payload: simplePayload{key: "two", noReadPerm: true}},
Event{Payload: simplePayload{key: "three", noReadPerm: true}},

View File

@ -128,7 +128,7 @@ func newEventFromBatch(req SubscribeRequest, events []Event) Event {
return Event{
Topic: req.Topic,
Index: first.Index,
Payload: NewPayloadEvents(events...),
Payload: newPayloadEvents(events...),
}
}

View File

@ -159,7 +159,7 @@ func TestNewEventsFromBatch(t *testing.T) {
expected := Event{
Topic: testTopic,
Index: 9999,
Payload: NewPayloadEvents(events...),
Payload: newPayloadEvents(events...),
}
require.Equal(t, expected, e)
})

View File

@ -917,7 +917,7 @@ func TestNewEventFromSteamEvent(t *testing.T) {
name: "event batch",
event: stream.Event{
Index: 2002,
Payload: stream.NewPayloadEvents(
Payload: newPayloadEvents(
stream.Event{
Index: 2002,
Payload: state.EventPayloadCheckServiceNode{
@ -1007,6 +1007,10 @@ func TestNewEventFromSteamEvent(t *testing.T) {
}
}
func newPayloadEvents(items ...stream.Event) *stream.PayloadEvents {
return &stream.PayloadEvents{Items: items}
}
// newEventFromSubscription is used to return framing events. EndOfSnapshot and
// NewSnapshotToFollow are not exported, but we can get them from a subscription.
func newEventFromSubscription(t *testing.T, index uint64) stream.Event {