mirror of
https://github.com/status-im/consul.git
synced 2025-02-20 09:28:34 +00:00
Merge pull request #9073 from hashicorp/dnephin/backport-streaming-namespaces
streaming: backport namespace changes
This commit is contained in:
commit
a33c50ef0d
@ -73,7 +73,7 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque
|
|||||||
Token: srvReq.Token,
|
Token: srvReq.Token,
|
||||||
Datacenter: srvReq.Datacenter,
|
Datacenter: srvReq.Datacenter,
|
||||||
Index: index,
|
Index: index,
|
||||||
// TODO(streaming): set Namespace from srvReq.EnterpriseMeta.Namespace
|
Namespace: srvReq.EnterpriseMeta.GetNamespace(),
|
||||||
}
|
}
|
||||||
if srvReq.Connect {
|
if srvReq.Connect {
|
||||||
req.Topic = pbsubscribe.Topic_ServiceHealthConnect
|
req.Topic = pbsubscribe.Topic_ServiceHealthConnect
|
||||||
|
@ -14,11 +14,13 @@ import (
|
|||||||
|
|
||||||
"github.com/hashicorp/consul/agent/cache"
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/proto/pbcommon"
|
||||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
|
func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
|
||||||
client := NewTestStreamingClient()
|
namespace := pbcommon.DefaultEnterpriseMeta.Namespace
|
||||||
|
client := NewTestStreamingClient(namespace)
|
||||||
typ := StreamingHealthServices{deps: MaterializerDeps{
|
typ := StreamingHealthServices{deps: MaterializerDeps{
|
||||||
Client: client,
|
Client: client,
|
||||||
Logger: hclog.Default(),
|
Logger: hclog.Default(),
|
||||||
@ -33,8 +35,9 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
|
|||||||
Timeout: time.Second,
|
Timeout: time.Second,
|
||||||
}
|
}
|
||||||
req := &structs.ServiceSpecificRequest{
|
req := &structs.ServiceSpecificRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
ServiceName: "web",
|
ServiceName: "web",
|
||||||
|
EnterpriseMeta: structs.EnterpriseMetaInitializer(namespace),
|
||||||
}
|
}
|
||||||
empty := &structs.IndexedCheckServiceNodes{
|
empty := &structs.IndexedCheckServiceNodes{
|
||||||
Nodes: structs.CheckServiceNodes{},
|
Nodes: structs.CheckServiceNodes{},
|
||||||
@ -215,8 +218,17 @@ func requireResultsSame(t *testing.T, want, got *structs.IndexedCheckServiceNode
|
|||||||
require.ElementsMatch(t, wantIDs, gotIDs)
|
require.ElementsMatch(t, wantIDs, gotIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getNamespace returns a namespace if namespace support exists, otherwise
|
||||||
|
// returns the empty string. It allows the same tests to work in both oss and ent
|
||||||
|
// without duplicating the tests.
|
||||||
|
func getNamespace(ns string) string {
|
||||||
|
meta := structs.EnterpriseMetaInitializer(ns)
|
||||||
|
return meta.GetNamespace()
|
||||||
|
}
|
||||||
|
|
||||||
func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
|
func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
|
||||||
client := NewTestStreamingClient()
|
namespace := getNamespace("ns2")
|
||||||
|
client := NewTestStreamingClient(namespace)
|
||||||
typ := StreamingHealthServices{deps: MaterializerDeps{
|
typ := StreamingHealthServices{deps: MaterializerDeps{
|
||||||
Client: client,
|
Client: client,
|
||||||
Logger: hclog.Default(),
|
Logger: hclog.Default(),
|
||||||
@ -238,8 +250,9 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
|
|||||||
Timeout: 1 * time.Second,
|
Timeout: 1 * time.Second,
|
||||||
}
|
}
|
||||||
req := &structs.ServiceSpecificRequest{
|
req := &structs.ServiceSpecificRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
ServiceName: "web",
|
ServiceName: "web",
|
||||||
|
EnterpriseMeta: structs.EnterpriseMetaInitializer(namespace),
|
||||||
}
|
}
|
||||||
|
|
||||||
gatherNodes := func(res interface{}) []string {
|
gatherNodes := func(res interface{}) []string {
|
||||||
@ -345,7 +358,8 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestStreamingHealthServices_EventBatches(t *testing.T) {
|
func TestStreamingHealthServices_EventBatches(t *testing.T) {
|
||||||
client := NewTestStreamingClient()
|
namespace := getNamespace("ns3")
|
||||||
|
client := NewTestStreamingClient(namespace)
|
||||||
typ := StreamingHealthServices{deps: MaterializerDeps{
|
typ := StreamingHealthServices{deps: MaterializerDeps{
|
||||||
Client: client,
|
Client: client,
|
||||||
Logger: hclog.Default(),
|
Logger: hclog.Default(),
|
||||||
@ -366,8 +380,9 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) {
|
|||||||
Timeout: 1 * time.Second,
|
Timeout: 1 * time.Second,
|
||||||
}
|
}
|
||||||
req := &structs.ServiceSpecificRequest{
|
req := &structs.ServiceSpecificRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
ServiceName: "web",
|
ServiceName: "web",
|
||||||
|
EnterpriseMeta: structs.EnterpriseMetaInitializer(namespace),
|
||||||
}
|
}
|
||||||
|
|
||||||
gatherNodes := func(res interface{}) []string {
|
gatherNodes := func(res interface{}) []string {
|
||||||
@ -415,7 +430,8 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestStreamingHealthServices_Filtering(t *testing.T) {
|
func TestStreamingHealthServices_Filtering(t *testing.T) {
|
||||||
client := NewTestStreamingClient()
|
namespace := getNamespace("ns3")
|
||||||
|
client := NewTestStreamingClient(namespace)
|
||||||
typ := StreamingHealthServices{deps: MaterializerDeps{
|
typ := StreamingHealthServices{deps: MaterializerDeps{
|
||||||
Client: client,
|
Client: client,
|
||||||
Logger: hclog.Default(),
|
Logger: hclog.Default(),
|
||||||
@ -436,8 +452,9 @@ func TestStreamingHealthServices_Filtering(t *testing.T) {
|
|||||||
Timeout: 1 * time.Second,
|
Timeout: 1 * time.Second,
|
||||||
}
|
}
|
||||||
req := &structs.ServiceSpecificRequest{
|
req := &structs.ServiceSpecificRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
ServiceName: "web",
|
ServiceName: "web",
|
||||||
|
EnterpriseMeta: structs.EnterpriseMetaInitializer(namespace),
|
||||||
QueryOptions: structs.QueryOptions{
|
QueryOptions: structs.QueryOptions{
|
||||||
Filter: `Node.Node == "node2"`,
|
Filter: `Node.Node == "node2"`,
|
||||||
},
|
},
|
||||||
|
@ -2,6 +2,7 @@ package cachetype
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
@ -12,8 +13,9 @@ import (
|
|||||||
// for queueing up custom events to a subscriber.
|
// for queueing up custom events to a subscriber.
|
||||||
type TestStreamingClient struct {
|
type TestStreamingClient struct {
|
||||||
pbsubscribe.StateChangeSubscription_SubscribeClient
|
pbsubscribe.StateChangeSubscription_SubscribeClient
|
||||||
events chan eventOrErr
|
events chan eventOrErr
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
expectedNamespace string
|
||||||
}
|
}
|
||||||
|
|
||||||
type eventOrErr struct {
|
type eventOrErr struct {
|
||||||
@ -21,17 +23,22 @@ type eventOrErr struct {
|
|||||||
Event *pbsubscribe.Event
|
Event *pbsubscribe.Event
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTestStreamingClient() *TestStreamingClient {
|
func NewTestStreamingClient(ns string) *TestStreamingClient {
|
||||||
return &TestStreamingClient{
|
return &TestStreamingClient{
|
||||||
events: make(chan eventOrErr, 32),
|
events: make(chan eventOrErr, 32),
|
||||||
|
expectedNamespace: ns,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TestStreamingClient) Subscribe(
|
func (t *TestStreamingClient) Subscribe(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
_ *pbsubscribe.SubscribeRequest,
|
req *pbsubscribe.SubscribeRequest,
|
||||||
_ ...grpc.CallOption,
|
_ ...grpc.CallOption,
|
||||||
) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) {
|
) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) {
|
||||||
|
if req.Namespace != t.expectedNamespace {
|
||||||
|
return nil, fmt.Errorf("wrong SubscribeRequest.Namespace %v, expected %v",
|
||||||
|
req.Namespace, t.expectedNamespace)
|
||||||
|
}
|
||||||
t.ctx = ctx
|
t.ctx = ctx
|
||||||
return t, nil
|
return t, nil
|
||||||
}
|
}
|
||||||
|
@ -32,7 +32,8 @@ func (e EventPayloadCheckServiceNode) FilterByKey(key, namespace string) bool {
|
|||||||
if e.key != "" {
|
if e.key != "" {
|
||||||
name = e.key
|
name = e.key
|
||||||
}
|
}
|
||||||
return key == name && namespace == e.Value.Service.EnterpriseMeta.GetNamespace()
|
ns := e.Value.Service.EnterpriseMeta.GetNamespace()
|
||||||
|
return (key == "" || key == name) && (namespace == "" || namespace == ns)
|
||||||
}
|
}
|
||||||
|
|
||||||
// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
|
// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
|
||||||
@ -45,8 +46,8 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc {
|
|||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
connect := topic == topicServiceHealthConnect
|
connect := topic == topicServiceHealthConnect
|
||||||
// TODO(namespace-streaming): plumb entMeta through from SubscribeRequest
|
entMeta := structs.EnterpriseMetaInitializer(req.Namespace)
|
||||||
idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, nil)
|
idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, &entMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
@ -349,8 +350,7 @@ func getPayloadCheckServiceNode(payload stream.Payload) *structs.CheckServiceNod
|
|||||||
// parseCheckServiceNodes but is more efficient since we know they are all on
|
// parseCheckServiceNodes but is more efficient since we know they are all on
|
||||||
// the same node.
|
// the same node.
|
||||||
func newServiceHealthEventsForNode(tx ReadTxn, idx uint64, node string) ([]stream.Event, error) {
|
func newServiceHealthEventsForNode(tx ReadTxn, idx uint64, node string) ([]stream.Event, error) {
|
||||||
// TODO(namespace-streaming): figure out the right EntMeta and mystery arg.
|
services, err := catalogServiceListByNode(tx, node, structs.WildcardEnterpriseMeta(), true)
|
||||||
services, err := catalogServiceListByNode(tx, node, nil, false)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -384,8 +384,7 @@ func getNodeAndChecks(tx ReadTxn, node string) (*structs.Node, serviceChecksFunc
|
|||||||
}
|
}
|
||||||
n := nodeRaw.(*structs.Node)
|
n := nodeRaw.(*structs.Node)
|
||||||
|
|
||||||
// TODO(namespace-streaming): work out what EntMeta is needed here, wildcard?
|
iter, err := catalogListChecksByNode(tx, node, structs.WildcardEnterpriseMeta())
|
||||||
iter, err := catalogListChecksByNode(tx, node, nil)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,8 @@ import (
|
|||||||
"github.com/google/go-cmp/cmp/cmpopts"
|
"github.com/google/go-cmp/cmp/cmpopts"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/proto/pbcommon"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/stream"
|
"github.com/hashicorp/consul/agent/consul/stream"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
@ -1459,3 +1461,98 @@ func newTestEventServiceHealthDeregister(index uint64, nodeNum int, svc string)
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEventPayloadCheckServiceNode_FilterByKey(t *testing.T) {
|
||||||
|
type testCase struct {
|
||||||
|
name string
|
||||||
|
payload EventPayloadCheckServiceNode
|
||||||
|
key string
|
||||||
|
namespace string
|
||||||
|
expected bool
|
||||||
|
}
|
||||||
|
|
||||||
|
fn := func(t *testing.T, tc testCase) {
|
||||||
|
if tc.namespace != "" && pbcommon.DefaultEnterpriseMeta.Namespace == "" {
|
||||||
|
t.Skip("cant test namespace matching without namespace support")
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, tc.expected, tc.payload.FilterByKey(tc.key, tc.namespace))
|
||||||
|
}
|
||||||
|
|
||||||
|
var testCases = []testCase{
|
||||||
|
{
|
||||||
|
name: "no key or namespace",
|
||||||
|
payload: newPayloadCheckServiceNode("srv1", "ns1"),
|
||||||
|
expected: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no key, with namespace match",
|
||||||
|
payload: newPayloadCheckServiceNode("srv1", "ns1"),
|
||||||
|
namespace: "ns1",
|
||||||
|
expected: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no namespace, with key match",
|
||||||
|
payload: newPayloadCheckServiceNode("srv1", "ns1"),
|
||||||
|
key: "srv1",
|
||||||
|
expected: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "key match, namespace mismatch",
|
||||||
|
payload: newPayloadCheckServiceNode("srv1", "ns1"),
|
||||||
|
key: "srv1",
|
||||||
|
namespace: "ns2",
|
||||||
|
expected: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "key mismatch, namespace match",
|
||||||
|
payload: newPayloadCheckServiceNode("srv1", "ns1"),
|
||||||
|
key: "srv2",
|
||||||
|
namespace: "ns1",
|
||||||
|
expected: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "override key match",
|
||||||
|
payload: newPayloadCheckServiceNodeWithKey("proxy", "ns1", "srv1"),
|
||||||
|
key: "srv1",
|
||||||
|
namespace: "ns1",
|
||||||
|
expected: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "override key match",
|
||||||
|
payload: newPayloadCheckServiceNodeWithKey("proxy", "ns1", "srv2"),
|
||||||
|
key: "proxy",
|
||||||
|
namespace: "ns1",
|
||||||
|
expected: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
fn(t, tc)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPayloadCheckServiceNode(service, namespace string) EventPayloadCheckServiceNode {
|
||||||
|
return EventPayloadCheckServiceNode{
|
||||||
|
Value: &structs.CheckServiceNode{
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Service: service,
|
||||||
|
EnterpriseMeta: structs.EnterpriseMetaInitializer(namespace),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPayloadCheckServiceNodeWithKey(service, namespace, key string) EventPayloadCheckServiceNode {
|
||||||
|
return EventPayloadCheckServiceNode{
|
||||||
|
Value: &structs.CheckServiceNode{
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Service: service,
|
||||||
|
EnterpriseMeta: structs.EnterpriseMetaInitializer(namespace),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
key: key,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -267,7 +267,7 @@ func (e *EventPublisher) getCachedSnapshotLocked(req *SubscribeRequest) *eventSn
|
|||||||
e.snapCache[req.Topic] = topicSnaps
|
e.snapCache[req.Topic] = topicSnaps
|
||||||
}
|
}
|
||||||
|
|
||||||
snap, ok := topicSnaps[req.Key]
|
snap, ok := topicSnaps[snapCacheKey(req)]
|
||||||
if ok && snap.err() == nil {
|
if ok && snap.err() == nil {
|
||||||
return snap
|
return snap
|
||||||
}
|
}
|
||||||
@ -279,12 +279,16 @@ func (e *EventPublisher) setCachedSnapshotLocked(req *SubscribeRequest, snap *ev
|
|||||||
if e.snapCacheTTL == 0 {
|
if e.snapCacheTTL == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
e.snapCache[req.Topic][req.Key] = snap
|
e.snapCache[req.Topic][snapCacheKey(req)] = snap
|
||||||
|
|
||||||
// Setup a cache eviction
|
// Setup a cache eviction
|
||||||
time.AfterFunc(e.snapCacheTTL, func() {
|
time.AfterFunc(e.snapCacheTTL, func() {
|
||||||
e.lock.Lock()
|
e.lock.Lock()
|
||||||
defer e.lock.Unlock()
|
defer e.lock.Unlock()
|
||||||
delete(e.snapCache[req.Topic], req.Key)
|
delete(e.snapCache[req.Topic], snapCacheKey(req))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func snapCacheKey(req *SubscribeRequest) string {
|
||||||
|
return fmt.Sprintf(req.Namespace + "/" + req.Key)
|
||||||
|
}
|
||||||
|
@ -134,7 +134,7 @@ func newEventFromBatch(req SubscribeRequest, events []Event) Event {
|
|||||||
|
|
||||||
func filterByKey(req SubscribeRequest, events []Event) (Event, bool) {
|
func filterByKey(req SubscribeRequest, events []Event) (Event, bool) {
|
||||||
event := newEventFromBatch(req, events)
|
event := newEventFromBatch(req, events)
|
||||||
if req.Key == "" {
|
if req.Key == "" && req.Namespace == "" {
|
||||||
return event, true
|
return event, true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -138,63 +138,147 @@ func publishTestEvent(index uint64, b *eventBuffer, key string) {
|
|||||||
b.Append([]Event{e})
|
b.Append([]Event{e})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFilter_NoKey(t *testing.T) {
|
|
||||||
events := make(PayloadEvents, 0, 5)
|
|
||||||
events = append(events, newSimpleEvent("One", 102), newSimpleEvent("Two", 102))
|
|
||||||
|
|
||||||
req := SubscribeRequest{Topic: testTopic}
|
|
||||||
actual, ok := filterByKey(req, events)
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, Event{Topic: testTopic, Index: 102, Payload: events}, actual)
|
|
||||||
|
|
||||||
// test that a new array was not allocated
|
|
||||||
require.Equal(t, cap(actual.Payload.(PayloadEvents)), 5)
|
|
||||||
}
|
|
||||||
|
|
||||||
func newSimpleEvent(key string, index uint64) Event {
|
func newSimpleEvent(key string, index uint64) Event {
|
||||||
return Event{Index: index, Payload: simplePayload{key: key}}
|
return Event{Index: index, Payload: simplePayload{key: key}}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFilter_WithKey_AllEventsMatch(t *testing.T) {
|
func TestFilterByKey(t *testing.T) {
|
||||||
events := make(PayloadEvents, 0, 5)
|
type testCase struct {
|
||||||
events = append(events, newSimpleEvent("Same", 103), newSimpleEvent("Same", 103))
|
name string
|
||||||
|
req SubscribeRequest
|
||||||
req := SubscribeRequest{Topic: testTopic, Key: "Same"}
|
events []Event
|
||||||
actual, ok := filterByKey(req, events)
|
expectEvent bool
|
||||||
require.True(t, ok)
|
expected Event
|
||||||
expected := Event{Topic: testTopic, Index: 103, Payload: events}
|
expectedCap int
|
||||||
require.Equal(t, expected, actual)
|
|
||||||
|
|
||||||
// test that a new array was not allocated
|
|
||||||
require.Equal(t, 5, cap(actual.Payload.(PayloadEvents)))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestFilter_WithKey_SomeEventsMatch(t *testing.T) {
|
|
||||||
events := make([]Event, 0, 5)
|
|
||||||
events = append(events,
|
|
||||||
newSimpleEvent("Same", 104),
|
|
||||||
newSimpleEvent("Other", 0),
|
|
||||||
newSimpleEvent("Same", 0))
|
|
||||||
|
|
||||||
req := SubscribeRequest{Topic: testTopic, Key: "Same"}
|
|
||||||
actual, ok := filterByKey(req, events)
|
|
||||||
require.True(t, ok)
|
|
||||||
expected := Event{
|
|
||||||
Topic: testTopic,
|
|
||||||
Index: 104,
|
|
||||||
Payload: PayloadEvents{newSimpleEvent("Same", 104), newSimpleEvent("Same", 0)},
|
|
||||||
}
|
}
|
||||||
require.Equal(t, expected, actual)
|
|
||||||
|
|
||||||
// test that a new array was allocated with the correct size
|
fn := func(t *testing.T, tc testCase) {
|
||||||
require.Equal(t, cap(actual.Payload.(PayloadEvents)), 2)
|
events := make(PayloadEvents, 0, 5)
|
||||||
|
events = append(events, tc.events...)
|
||||||
|
|
||||||
|
actual, ok := filterByKey(tc.req, events)
|
||||||
|
require.Equal(t, tc.expectEvent, ok)
|
||||||
|
if !tc.expectEvent {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, tc.expected, actual)
|
||||||
|
// test if there was a new array allocated or not
|
||||||
|
require.Equal(t, tc.expectedCap, cap(actual.Payload.(PayloadEvents)))
|
||||||
|
}
|
||||||
|
|
||||||
|
var testCases = []testCase{
|
||||||
|
{
|
||||||
|
name: "all events match, no key or namespace",
|
||||||
|
req: SubscribeRequest{Topic: testTopic},
|
||||||
|
events: []Event{
|
||||||
|
newSimpleEvent("One", 102),
|
||||||
|
newSimpleEvent("Two", 102)},
|
||||||
|
expectEvent: true,
|
||||||
|
expected: Event{
|
||||||
|
Topic: testTopic,
|
||||||
|
Index: 102,
|
||||||
|
Payload: PayloadEvents{
|
||||||
|
newSimpleEvent("One", 102),
|
||||||
|
newSimpleEvent("Two", 102)}},
|
||||||
|
expectedCap: 5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "all events match, no namespace",
|
||||||
|
req: SubscribeRequest{Topic: testTopic, Key: "Same"},
|
||||||
|
events: []Event{
|
||||||
|
newSimpleEvent("Same", 103),
|
||||||
|
newSimpleEvent("Same", 103)},
|
||||||
|
expectEvent: true,
|
||||||
|
expected: Event{
|
||||||
|
Topic: testTopic,
|
||||||
|
Index: 103,
|
||||||
|
Payload: PayloadEvents{
|
||||||
|
newSimpleEvent("Same", 103),
|
||||||
|
newSimpleEvent("Same", 103)}},
|
||||||
|
expectedCap: 5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "all events match, no key",
|
||||||
|
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
|
||||||
|
events: []Event{
|
||||||
|
newNSEvent("Something", "apps"),
|
||||||
|
newNSEvent("Other", "apps")},
|
||||||
|
expectEvent: true,
|
||||||
|
expected: Event{
|
||||||
|
Topic: testTopic,
|
||||||
|
Index: 22,
|
||||||
|
Payload: PayloadEvents{
|
||||||
|
newNSEvent("Something", "apps"),
|
||||||
|
newNSEvent("Other", "apps")}},
|
||||||
|
expectedCap: 5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some evens match, no namespace",
|
||||||
|
req: SubscribeRequest{Topic: testTopic, Key: "Same"},
|
||||||
|
events: []Event{
|
||||||
|
newSimpleEvent("Same", 104),
|
||||||
|
newSimpleEvent("Other", 104),
|
||||||
|
newSimpleEvent("Same", 104)},
|
||||||
|
expectEvent: true,
|
||||||
|
expected: Event{
|
||||||
|
Topic: testTopic,
|
||||||
|
Index: 104,
|
||||||
|
Payload: PayloadEvents{
|
||||||
|
newSimpleEvent("Same", 104),
|
||||||
|
newSimpleEvent("Same", 104)}},
|
||||||
|
expectedCap: 2,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some events match, no key",
|
||||||
|
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
|
||||||
|
events: []Event{
|
||||||
|
newNSEvent("app1", "apps"),
|
||||||
|
newNSEvent("db1", "dbs"),
|
||||||
|
newNSEvent("app2", "apps")},
|
||||||
|
expectEvent: true,
|
||||||
|
expected: Event{
|
||||||
|
Topic: testTopic,
|
||||||
|
Index: 22,
|
||||||
|
Payload: PayloadEvents{
|
||||||
|
newNSEvent("app1", "apps"),
|
||||||
|
newNSEvent("app2", "apps")}},
|
||||||
|
expectedCap: 2,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no events match key",
|
||||||
|
req: SubscribeRequest{Topic: testTopic, Key: "Other"},
|
||||||
|
events: []Event{
|
||||||
|
newSimpleEvent("Same", 0),
|
||||||
|
newSimpleEvent("Same", 0)},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no events match namespace",
|
||||||
|
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
|
||||||
|
events: []Event{
|
||||||
|
newNSEvent("app1", "group1"),
|
||||||
|
newNSEvent("app2", "group2")},
|
||||||
|
expectEvent: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
fn(t, tc)
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFilter_WithKey_NoEventsMatch(t *testing.T) {
|
func newNSEvent(key, namespace string) Event {
|
||||||
events := make([]Event, 0, 5)
|
return Event{Index: 22, Payload: nsPayload{key: key, namespace: namespace}}
|
||||||
events = append(events, newSimpleEvent("Same", 0), newSimpleEvent("Same", 0))
|
}
|
||||||
|
|
||||||
req := SubscribeRequest{Topic: testTopic, Key: "Other"}
|
type nsPayload struct {
|
||||||
_, ok := filterByKey(req, events)
|
key string
|
||||||
require.False(t, ok)
|
namespace string
|
||||||
|
value string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p nsPayload) FilterByKey(key, namespace string) bool {
|
||||||
|
return (key == "" || key == p.key) && (namespace == "" || namespace == p.namespace)
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
"github.com/hashicorp/consul/agent/consul/stream"
|
"github.com/hashicorp/consul/agent/consul/stream"
|
||||||
agentgrpc "github.com/hashicorp/consul/agent/grpc"
|
agentgrpc "github.com/hashicorp/consul/agent/grpc"
|
||||||
"github.com/hashicorp/consul/agent/rpc/subscribe"
|
"github.com/hashicorp/consul/agent/rpc/subscribe"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
type subscribeBackend struct {
|
type subscribeBackend struct {
|
||||||
@ -16,8 +17,12 @@ type subscribeBackend struct {
|
|||||||
|
|
||||||
// TODO: refactor Resolve methods to an ACLBackend that can be used by all
|
// TODO: refactor Resolve methods to an ACLBackend that can be used by all
|
||||||
// the endpoints.
|
// the endpoints.
|
||||||
func (s subscribeBackend) ResolveToken(token string) (acl.Authorizer, error) {
|
func (s subscribeBackend) ResolveTokenAndDefaultMeta(
|
||||||
return s.srv.ResolveToken(token)
|
token string,
|
||||||
|
entMeta *structs.EnterpriseMeta,
|
||||||
|
authzContext *acl.AuthorizerContext,
|
||||||
|
) (acl.Authorizer, error) {
|
||||||
|
return s.srv.ResolveTokenAndDefaultMeta(token, entMeta, authzContext)
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ subscribe.Backend = (*subscribeBackend)(nil)
|
var _ subscribe.Backend = (*subscribeBackend)(nil)
|
||||||
|
@ -37,11 +37,12 @@ func (s *streamID) String() string {
|
|||||||
return s.id
|
return s.id
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Server) newLoggerForRequest(req *pbsubscribe.SubscribeRequest) Logger {
|
func newLoggerForRequest(l Logger, req *pbsubscribe.SubscribeRequest) Logger {
|
||||||
return h.Logger.With(
|
return l.With(
|
||||||
"topic", req.Topic.String(),
|
"topic", req.Topic.String(),
|
||||||
"dc", req.Datacenter,
|
"dc", req.Datacenter,
|
||||||
"key", req.Key,
|
"key", req.Key,
|
||||||
|
"namespace", req.Namespace,
|
||||||
"index", req.Index,
|
"index", req.Index,
|
||||||
"stream_id", &streamID{})
|
"stream_id", &streamID{})
|
||||||
}
|
}
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/consul/stream"
|
"github.com/hashicorp/consul/agent/consul/stream"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/proto/pbservice"
|
"github.com/hashicorp/consul/proto/pbservice"
|
||||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||||
)
|
)
|
||||||
@ -35,15 +36,13 @@ type Logger interface {
|
|||||||
var _ pbsubscribe.StateChangeSubscriptionServer = (*Server)(nil)
|
var _ pbsubscribe.StateChangeSubscriptionServer = (*Server)(nil)
|
||||||
|
|
||||||
type Backend interface {
|
type Backend interface {
|
||||||
// TODO(streaming): Use ResolveTokenAndDefaultMeta instead once SubscribeRequest
|
ResolveTokenAndDefaultMeta(token string, entMeta *structs.EnterpriseMeta, authzContext *acl.AuthorizerContext) (acl.Authorizer, error)
|
||||||
// has an EnterpriseMeta.
|
|
||||||
ResolveToken(token string) (acl.Authorizer, error)
|
|
||||||
Forward(dc string, f func(*grpc.ClientConn) error) (handled bool, err error)
|
Forward(dc string, f func(*grpc.ClientConn) error) (handled bool, err error)
|
||||||
Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error)
|
Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsubscribe.StateChangeSubscription_SubscribeServer) error {
|
func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsubscribe.StateChangeSubscription_SubscribeServer) error {
|
||||||
logger := h.newLoggerForRequest(req)
|
logger := newLoggerForRequest(h.Logger, req)
|
||||||
handled, err := h.Backend.Forward(req.Datacenter, forwardToDC(req, serverStream, logger))
|
handled, err := h.Backend.Forward(req.Datacenter, forwardToDC(req, serverStream, logger))
|
||||||
if handled || err != nil {
|
if handled || err != nil {
|
||||||
return err
|
return err
|
||||||
@ -52,13 +51,13 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
|
|||||||
logger.Trace("new subscription")
|
logger.Trace("new subscription")
|
||||||
defer logger.Trace("subscription closed")
|
defer logger.Trace("subscription closed")
|
||||||
|
|
||||||
// Resolve the token and create the ACL filter.
|
entMeta := structs.EnterpriseMetaInitializer(req.Namespace)
|
||||||
authz, err := h.Backend.ResolveToken(req.Token)
|
authz, err := h.Backend.ResolveTokenAndDefaultMeta(req.Token, &entMeta, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
sub, err := h.Backend.Subscribe(toStreamSubscribeRequest(req))
|
sub, err := h.Backend.Subscribe(toStreamSubscribeRequest(req, entMeta))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -90,13 +89,13 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: can be replaced by mog conversion
|
func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest, entMeta structs.EnterpriseMeta) *stream.SubscribeRequest {
|
||||||
func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest) *stream.SubscribeRequest {
|
|
||||||
return &stream.SubscribeRequest{
|
return &stream.SubscribeRequest{
|
||||||
Topic: req.Topic,
|
Topic: req.Topic,
|
||||||
Key: req.Key,
|
Key: req.Key,
|
||||||
Token: req.Token,
|
Token: req.Token,
|
||||||
Index: req.Index,
|
Index: req.Index,
|
||||||
|
Namespace: entMeta.GetNamespace(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -93,8 +93,9 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
|
|||||||
runStep(t, "setup a client and subscribe to a topic", func(t *testing.T) {
|
runStep(t, "setup a client and subscribe to a topic", func(t *testing.T) {
|
||||||
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
||||||
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
|
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
|
||||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||||
Key: "redis",
|
Key: "redis",
|
||||||
|
Namespace: pbcommon.DefaultEnterpriseMeta.Namespace,
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
@ -130,7 +131,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
|
|||||||
Expose: pbservice.ExposeConfig{},
|
Expose: pbservice.ExposeConfig{},
|
||||||
},
|
},
|
||||||
RaftIndex: raftIndex(ids, "reg2", "reg2"),
|
RaftIndex: raftIndex(ids, "reg2", "reg2"),
|
||||||
EnterpriseMeta: pbcommon.EnterpriseMeta{},
|
EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -160,7 +161,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
|
|||||||
Expose: pbservice.ExposeConfig{},
|
Expose: pbservice.ExposeConfig{},
|
||||||
},
|
},
|
||||||
RaftIndex: raftIndex(ids, "reg3", "reg3"),
|
RaftIndex: raftIndex(ids, "reg3", "reg3"),
|
||||||
EnterpriseMeta: pbcommon.EnterpriseMeta{},
|
EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -209,7 +210,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
|
|||||||
Expose: pbservice.ExposeConfig{},
|
Expose: pbservice.ExposeConfig{},
|
||||||
},
|
},
|
||||||
RaftIndex: raftIndex(ids, "reg3", "reg3"),
|
RaftIndex: raftIndex(ids, "reg3", "reg3"),
|
||||||
EnterpriseMeta: pbcommon.EnterpriseMeta{},
|
EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
|
||||||
},
|
},
|
||||||
Checks: []*pbservice.HealthCheck{
|
Checks: []*pbservice.HealthCheck{
|
||||||
{
|
{
|
||||||
@ -220,7 +221,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
|
|||||||
ServiceID: "redis1",
|
ServiceID: "redis1",
|
||||||
ServiceName: "redis",
|
ServiceName: "redis",
|
||||||
RaftIndex: raftIndex(ids, "update", "update"),
|
RaftIndex: raftIndex(ids, "update", "update"),
|
||||||
EnterpriseMeta: pbcommon.EnterpriseMeta{},
|
EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -261,7 +262,7 @@ func getEvent(t *testing.T, ch chan eventOrError) *pbsubscribe.Event {
|
|||||||
case item := <-ch:
|
case item := <-ch:
|
||||||
require.NoError(t, item.err)
|
require.NoError(t, item.err)
|
||||||
return item.event
|
return item.event
|
||||||
case <-time.After(10 * time.Second):
|
case <-time.After(2 * time.Second):
|
||||||
t.Fatalf("timeout waiting on event from server")
|
t.Fatalf("timeout waiting on event from server")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -280,7 +281,11 @@ type testBackend struct {
|
|||||||
forwardConn *gogrpc.ClientConn
|
forwardConn *gogrpc.ClientConn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b testBackend) ResolveToken(token string) (acl.Authorizer, error) {
|
func (b testBackend) ResolveTokenAndDefaultMeta(
|
||||||
|
token string,
|
||||||
|
_ *structs.EnterpriseMeta,
|
||||||
|
_ *acl.AuthorizerContext,
|
||||||
|
) (acl.Authorizer, error) {
|
||||||
return b.authorizer(token), nil
|
return b.authorizer(token), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -440,6 +445,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
|
|||||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||||
Key: "redis",
|
Key: "redis",
|
||||||
Datacenter: "dc2",
|
Datacenter: "dc2",
|
||||||
|
Namespace: pbcommon.DefaultEnterpriseMeta.Namespace,
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
go recvEvents(chEvents, streamHandle)
|
go recvEvents(chEvents, streamHandle)
|
||||||
@ -474,7 +480,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
|
|||||||
MeshGateway: pbservice.MeshGatewayConfig{},
|
MeshGateway: pbservice.MeshGatewayConfig{},
|
||||||
Expose: pbservice.ExposeConfig{},
|
Expose: pbservice.ExposeConfig{},
|
||||||
},
|
},
|
||||||
EnterpriseMeta: pbcommon.EnterpriseMeta{},
|
EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
|
||||||
RaftIndex: raftIndex(ids, "reg2", "reg2"),
|
RaftIndex: raftIndex(ids, "reg2", "reg2"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -504,7 +510,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
|
|||||||
MeshGateway: pbservice.MeshGatewayConfig{},
|
MeshGateway: pbservice.MeshGatewayConfig{},
|
||||||
Expose: pbservice.ExposeConfig{},
|
Expose: pbservice.ExposeConfig{},
|
||||||
},
|
},
|
||||||
EnterpriseMeta: pbcommon.EnterpriseMeta{},
|
EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
|
||||||
RaftIndex: raftIndex(ids, "reg3", "reg3"),
|
RaftIndex: raftIndex(ids, "reg3", "reg3"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -554,7 +560,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
|
|||||||
MeshGateway: pbservice.MeshGatewayConfig{},
|
MeshGateway: pbservice.MeshGatewayConfig{},
|
||||||
Expose: pbservice.ExposeConfig{},
|
Expose: pbservice.ExposeConfig{},
|
||||||
},
|
},
|
||||||
EnterpriseMeta: pbcommon.EnterpriseMeta{},
|
EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
|
||||||
},
|
},
|
||||||
Checks: []*pbservice.HealthCheck{
|
Checks: []*pbservice.HealthCheck{
|
||||||
{
|
{
|
||||||
@ -565,7 +571,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
|
|||||||
ServiceID: "redis1",
|
ServiceID: "redis1",
|
||||||
ServiceName: "redis",
|
ServiceName: "redis",
|
||||||
RaftIndex: raftIndex(ids, "update", "update"),
|
RaftIndex: raftIndex(ids, "update", "update"),
|
||||||
EnterpriseMeta: pbcommon.EnterpriseMeta{},
|
EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -595,10 +601,8 @@ node "node1" {
|
|||||||
policy = "write"
|
policy = "write"
|
||||||
}
|
}
|
||||||
`
|
`
|
||||||
authorizer, err := acl.NewAuthorizerFromRules(
|
cfg := &acl.Config{WildcardName: structs.WildcardSpecifier}
|
||||||
"1", 0, rules, acl.SyntaxCurrent,
|
authorizer, err := acl.NewAuthorizerFromRules("1", 0, rules, acl.SyntaxCurrent, cfg, nil)
|
||||||
&acl.Config{WildcardName: structs.WildcardSpecifier},
|
|
||||||
nil)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
authorizer = acl.NewChainedAuthorizer([]acl.Authorizer{authorizer, acl.DenyAll()})
|
authorizer = acl.NewChainedAuthorizer([]acl.Authorizer{authorizer, acl.DenyAll()})
|
||||||
require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil))
|
require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil))
|
||||||
@ -676,9 +680,10 @@ node "node1" {
|
|||||||
|
|
||||||
runStep(t, "setup a client, subscribe to a topic, and receive a snapshot", func(t *testing.T) {
|
runStep(t, "setup a client, subscribe to a topic, and receive a snapshot", func(t *testing.T) {
|
||||||
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
|
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
|
||||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||||
Key: "foo",
|
Key: "foo",
|
||||||
Token: token,
|
Token: token,
|
||||||
|
Namespace: pbcommon.DefaultEnterpriseMeta.Namespace,
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -1626,9 +1626,8 @@ func (csn *CheckServiceNode) CanRead(authz acl.Authorizer) acl.EnforcementDecisi
|
|||||||
return acl.Deny
|
return acl.Deny
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(streaming): add enterprise test that uses namespaces
|
|
||||||
authzContext := new(acl.AuthorizerContext)
|
authzContext := new(acl.AuthorizerContext)
|
||||||
csn.Service.FillAuthzContext(authzContext)
|
csn.Service.EnterpriseMeta.FillAuthzContext(authzContext)
|
||||||
|
|
||||||
if authz.NodeRead(csn.Node.Node, authzContext) != acl.Allow {
|
if authz.NodeRead(csn.Node.Node, authzContext) != acl.Allow {
|
||||||
return acl.Deny
|
return acl.Deny
|
||||||
|
@ -1290,7 +1290,7 @@ func TestCheckServiceNodes_Filter(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCheckServiceNodes_CanRead(t *testing.T) {
|
func TestCheckServiceNode_CanRead(t *testing.T) {
|
||||||
type testCase struct {
|
type testCase struct {
|
||||||
name string
|
name string
|
||||||
csn CheckServiceNode
|
csn CheckServiceNode
|
||||||
|
5
proto/pbcommon/common_oss.go
Normal file
5
proto/pbcommon/common_oss.go
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
// +build !consulent
|
||||||
|
|
||||||
|
package pbcommon
|
||||||
|
|
||||||
|
var DefaultEnterpriseMeta = EnterpriseMeta{}
|
@ -1,3 +1,5 @@
|
|||||||
|
// +build !consulent
|
||||||
|
|
||||||
package pbservice
|
package pbservice
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
12
proto/pbservice/convert_oss_test.go
Normal file
12
proto/pbservice/convert_oss_test.go
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
// +build !consulent
|
||||||
|
|
||||||
|
package pbservice
|
||||||
|
|
||||||
|
import (
|
||||||
|
fuzz "github.com/google/gofuzz"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
)
|
||||||
|
|
||||||
|
func randEnterpriseMeta(_ *structs.EnterpriseMeta, _ fuzz.Continue) {
|
||||||
|
}
|
@ -108,8 +108,3 @@ func randInterface(m *interface{}, c fuzz.Continue) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(streaming): this is a quick fix to get the tests passing in enterprise.
|
|
||||||
// This needs to use a real random value once enterprise support is complete.
|
|
||||||
func randEnterpriseMeta(_ *structs.EnterpriseMeta, _ fuzz.Continue) {
|
|
||||||
}
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user