mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 22:06:20 +00:00
Merge pull request #8768 from hashicorp/streaming/add-subscribe-service
subscribe: add subscribe service for streaming change events
This commit is contained in:
commit
da6400192b
@ -1,6 +1,7 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/agent/grpc"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
@ -14,4 +15,5 @@ type Deps struct {
|
||||
Tokens *token.Store
|
||||
Router *router.Router
|
||||
ConnPool *pool.ConnPool
|
||||
GRPCConnPool *grpc.ClientConnPool
|
||||
}
|
||||
|
@ -18,24 +18,6 @@ import (
|
||||
"time"
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/acl"
|
||||
ca "github.com/hashicorp/consul/agent/connect/ca"
|
||||
"github.com/hashicorp/consul/agent/consul/authmethod"
|
||||
"github.com/hashicorp/consul/agent/consul/authmethod/ssoauth"
|
||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||
"github.com/hashicorp/consul/agent/consul/fsm"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/usagemetrics"
|
||||
"github.com/hashicorp/consul/agent/grpc"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/consul/types"
|
||||
connlimit "github.com/hashicorp/go-connlimit"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
@ -44,6 +26,28 @@ import (
|
||||
raftboltdb "github.com/hashicorp/raft-boltdb"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"golang.org/x/time/rate"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
ca "github.com/hashicorp/consul/agent/connect/ca"
|
||||
"github.com/hashicorp/consul/agent/consul/authmethod"
|
||||
"github.com/hashicorp/consul/agent/consul/authmethod/ssoauth"
|
||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||
"github.com/hashicorp/consul/agent/consul/fsm"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/usagemetrics"
|
||||
agentgrpc "github.com/hashicorp/consul/agent/grpc"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
"github.com/hashicorp/consul/agent/rpc/subscribe"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/consul/types"
|
||||
)
|
||||
|
||||
// These are the protocol versions that Consul can _understand_. These are
|
||||
@ -583,7 +587,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
|
||||
}
|
||||
go reporter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
|
||||
|
||||
s.grpcHandler = newGRPCHandlerFromConfig(logger, config)
|
||||
s.grpcHandler = newGRPCHandlerFromConfig(flat, config, s)
|
||||
|
||||
// Initialize Autopilot. This must happen before starting leadership monitoring
|
||||
// as establishing leadership could attempt to use autopilot and cause a panic.
|
||||
@ -612,12 +616,17 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func newGRPCHandlerFromConfig(logger hclog.Logger, config *Config) connHandler {
|
||||
func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler {
|
||||
if !config.EnableGRPCServer {
|
||||
return grpc.NoOpHandler{Logger: logger}
|
||||
return agentgrpc.NoOpHandler{Logger: deps.Logger}
|
||||
}
|
||||
|
||||
return grpc.NewHandler(config.RPCAddr)
|
||||
register := func(srv *grpc.Server) {
|
||||
pbsubscribe.RegisterStateChangeSubscriptionServer(srv, subscribe.NewServer(
|
||||
&subscribeBackend{srv: s, connPool: deps.GRPCConnPool},
|
||||
deps.Logger.Named("grpc-api.subscription")))
|
||||
}
|
||||
return agentgrpc.NewHandler(config.RPCAddr, register)
|
||||
}
|
||||
|
||||
func (s *Server) connectCARootsMonitor(ctx context.Context) {
|
||||
|
@ -5,9 +5,10 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
pbacl "github.com/hashicorp/consul/proto/pbacl"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
type TokenPoliciesIndex struct {
|
||||
|
@ -19,7 +19,7 @@ const (
|
||||
|
||||
// ErrSubscriptionClosed is a error signalling the subscription has been
|
||||
// closed. The client should Unsubscribe, then re-Subscribe.
|
||||
var ErrSubscriptionClosed = errors.New("subscription closed by server, client should resubscribe")
|
||||
var ErrSubscriptionClosed = errors.New("subscription closed by server, client must reset state and resubscribe")
|
||||
|
||||
// Subscription provides events on a Topic. Events may be filtered by Key.
|
||||
// Events are returned by Next(), and may start with a Snapshot of events.
|
||||
|
43
agent/consul/subscribe_backend.go
Normal file
43
agent/consul/subscribe_backend.go
Normal file
@ -0,0 +1,43 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
agentgrpc "github.com/hashicorp/consul/agent/grpc"
|
||||
"github.com/hashicorp/consul/agent/rpc/subscribe"
|
||||
)
|
||||
|
||||
type subscribeBackend struct {
|
||||
srv *Server
|
||||
connPool *agentgrpc.ClientConnPool
|
||||
}
|
||||
|
||||
// TODO: refactor Resolve methods to an ACLBackend that can be used by all
|
||||
// the endpoints.
|
||||
func (s subscribeBackend) ResolveToken(token string) (acl.Authorizer, error) {
|
||||
return s.srv.ResolveToken(token)
|
||||
}
|
||||
|
||||
var _ subscribe.Backend = (*subscribeBackend)(nil)
|
||||
|
||||
// Forward requests to a remote datacenter by calling f if the target dc does not
|
||||
// match the config. Does nothing but return handled=false if dc is not specified,
|
||||
// or if it matches the Datacenter in config.
|
||||
//
|
||||
// TODO: extract this so that it can be used with other grpc services.
|
||||
func (s subscribeBackend) Forward(dc string, f func(*grpc.ClientConn) error) (handled bool, err error) {
|
||||
if dc == "" || dc == s.srv.config.Datacenter {
|
||||
return false, nil
|
||||
}
|
||||
conn, err := s.connPool.ClientConn(dc)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, f(conn)
|
||||
}
|
||||
|
||||
func (s subscribeBackend) Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error) {
|
||||
return s.srv.fsm.State().EventPublisher().Subscribe(req)
|
||||
}
|
@ -11,15 +11,16 @@ import (
|
||||
)
|
||||
|
||||
// NewHandler returns a gRPC server that accepts connections from Handle(conn).
|
||||
func NewHandler(addr net.Addr) *Handler {
|
||||
// The register function will be called with the grpc.Server to register
|
||||
// gRPC services with the server.
|
||||
func NewHandler(addr net.Addr, register func(server *grpc.Server)) *Handler {
|
||||
// We don't need to pass tls.Config to the server since it's multiplexed
|
||||
// behind the RPC listener, which already has TLS configured.
|
||||
srv := grpc.NewServer(
|
||||
grpc.StatsHandler(newStatsHandler()),
|
||||
grpc.StreamInterceptor((&activeStreamCounter{}).Intercept),
|
||||
)
|
||||
|
||||
// TODO(streaming): add gRPC services to srv here
|
||||
register(srv)
|
||||
|
||||
lis := &chanListener{addr: addr, conns: make(chan net.Conn)}
|
||||
return &Handler{srv: srv, listener: lis}
|
||||
|
@ -13,6 +13,7 @@ import (
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type testServer struct {
|
||||
@ -28,9 +29,9 @@ func (s testServer) Metadata() *metadata.Server {
|
||||
|
||||
func newTestServer(t *testing.T, name string, dc string) testServer {
|
||||
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
|
||||
handler := NewHandler(addr)
|
||||
|
||||
testservice.RegisterSimpleServer(handler.srv, &simple{name: name, dc: dc})
|
||||
handler := NewHandler(addr, func(server *grpc.Server) {
|
||||
testservice.RegisterSimpleServer(server, &simple{name: name, dc: dc})
|
||||
})
|
||||
|
||||
lis, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
@ -14,11 +14,14 @@ import (
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func noopRegister(*grpc.Server) {}
|
||||
|
||||
func TestHandler_EmitsStats(t *testing.T) {
|
||||
sink := patchGlobalMetrics(t)
|
||||
|
||||
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
|
||||
handler := NewHandler(addr)
|
||||
handler := NewHandler(addr, noopRegister)
|
||||
|
||||
testservice.RegisterSimpleServer(handler.srv, &simple{})
|
||||
|
||||
lis, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
|
22
agent/rpc/subscribe/auth.go
Normal file
22
agent/rpc/subscribe/auth.go
Normal file
@ -0,0 +1,22 @@
|
||||
package subscribe
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
)
|
||||
|
||||
// EnforceACL takes an acl.Authorizer and returns the decision for whether the
|
||||
// event is allowed to be sent to this client or not.
|
||||
func enforceACL(authz acl.Authorizer, e stream.Event) acl.EnforcementDecision {
|
||||
switch {
|
||||
case e.IsEndOfSnapshot(), e.IsNewSnapshotToFollow():
|
||||
return acl.Allow
|
||||
}
|
||||
|
||||
switch p := e.Payload.(type) {
|
||||
case state.EventPayloadCheckServiceNode:
|
||||
return p.Value.CanRead(authz)
|
||||
}
|
||||
return acl.Deny
|
||||
}
|
72
agent/rpc/subscribe/logger.go
Normal file
72
agent/rpc/subscribe/logger.go
Normal file
@ -0,0 +1,72 @@
|
||||
package subscribe
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-uuid"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||
)
|
||||
|
||||
// streamID is used in logs as a unique identifier for a subscription. The value
|
||||
// is created lazily on the first call to String() so that we do not call it
|
||||
// if trace logging is disabled.
|
||||
// If a random UUID can not be created, defaults to the current time formatted
|
||||
// as RFC3339Nano.
|
||||
//
|
||||
// TODO(banks) it might be nice one day to replace this with OpenTracing ID
|
||||
// if one is set etc. but probably pointless until we support that properly
|
||||
// in other places so it's actually propagated properly. For now this just
|
||||
// makes lifetime of a stream more traceable in our regular server logs for
|
||||
// debugging/dev.
|
||||
type streamID struct {
|
||||
once sync.Once
|
||||
id string
|
||||
}
|
||||
|
||||
func (s *streamID) String() string {
|
||||
s.once.Do(func() {
|
||||
var err error
|
||||
s.id, err = uuid.GenerateUUID()
|
||||
if err != nil {
|
||||
s.id = time.Now().Format(time.RFC3339Nano)
|
||||
}
|
||||
})
|
||||
return s.id
|
||||
}
|
||||
|
||||
func (h *Server) newLoggerForRequest(req *pbsubscribe.SubscribeRequest) Logger {
|
||||
return h.Logger.With(
|
||||
"topic", req.Topic.String(),
|
||||
"dc", req.Datacenter,
|
||||
"key", req.Key,
|
||||
"index", req.Index,
|
||||
"stream_id", &streamID{})
|
||||
}
|
||||
|
||||
type eventLogger struct {
|
||||
logger Logger
|
||||
snapshotDone bool
|
||||
count uint64
|
||||
}
|
||||
|
||||
func (l *eventLogger) Trace(e []stream.Event) {
|
||||
if len(e) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
first := e[0]
|
||||
switch {
|
||||
case first.IsEndOfSnapshot():
|
||||
l.snapshotDone = true
|
||||
l.logger.Trace("snapshot complete", "index", first.Index, "sent", l.count)
|
||||
case first.IsNewSnapshotToFollow():
|
||||
return
|
||||
case l.snapshotDone:
|
||||
l.logger.Trace("sending events", "index", first.Index, "sent", l.count, "batch_size", len(e))
|
||||
}
|
||||
|
||||
l.count += uint64(len(e))
|
||||
}
|
212
agent/rpc/subscribe/subscribe.go
Normal file
212
agent/rpc/subscribe/subscribe.go
Normal file
@ -0,0 +1,212 @@
|
||||
package subscribe
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/proto/pbservice"
|
||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||
)
|
||||
|
||||
// Server implements a StateChangeSubscriptionServer for accepting SubscribeRequests,
|
||||
// and sending events to the subscription topic.
|
||||
type Server struct {
|
||||
Backend Backend
|
||||
Logger Logger
|
||||
}
|
||||
|
||||
func NewServer(backend Backend, logger Logger) *Server {
|
||||
return &Server{Backend: backend, Logger: logger}
|
||||
}
|
||||
|
||||
type Logger interface {
|
||||
Trace(msg string, args ...interface{})
|
||||
With(args ...interface{}) hclog.Logger
|
||||
}
|
||||
|
||||
var _ pbsubscribe.StateChangeSubscriptionServer = (*Server)(nil)
|
||||
|
||||
type Backend interface {
|
||||
// TODO(streaming): Use ResolveTokenAndDefaultMeta instead once SubscribeRequest
|
||||
// has an EnterpriseMeta.
|
||||
ResolveToken(token string) (acl.Authorizer, error)
|
||||
Forward(dc string, f func(*grpc.ClientConn) error) (handled bool, err error)
|
||||
Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error)
|
||||
}
|
||||
|
||||
func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsubscribe.StateChangeSubscription_SubscribeServer) error {
|
||||
logger := h.newLoggerForRequest(req)
|
||||
handled, err := h.Backend.Forward(req.Datacenter, forwardToDC(req, serverStream, logger))
|
||||
if handled || err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Trace("new subscription")
|
||||
defer logger.Trace("subscription closed")
|
||||
|
||||
// Resolve the token and create the ACL filter.
|
||||
authz, err := h.Backend.ResolveToken(req.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sub, err := h.Backend.Subscribe(toStreamSubscribeRequest(req))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
ctx := serverStream.Context()
|
||||
elog := &eventLogger{logger: logger}
|
||||
for {
|
||||
events, err := sub.Next(ctx)
|
||||
switch {
|
||||
case errors.Is(err, stream.ErrSubscriptionClosed):
|
||||
logger.Trace("subscription reset by server")
|
||||
return status.Error(codes.Aborted, err.Error())
|
||||
case err != nil:
|
||||
return err
|
||||
}
|
||||
|
||||
events = filterStreamEvents(authz, events)
|
||||
if len(events) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
elog.Trace(events)
|
||||
e := newEventFromStreamEvents(req, events)
|
||||
if err := serverStream.Send(e); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: can be replaced by mog conversion
|
||||
func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest) *stream.SubscribeRequest {
|
||||
return &stream.SubscribeRequest{
|
||||
Topic: req.Topic,
|
||||
Key: req.Key,
|
||||
Token: req.Token,
|
||||
Index: req.Index,
|
||||
}
|
||||
}
|
||||
|
||||
func forwardToDC(
|
||||
req *pbsubscribe.SubscribeRequest,
|
||||
serverStream pbsubscribe.StateChangeSubscription_SubscribeServer,
|
||||
logger Logger,
|
||||
) func(conn *grpc.ClientConn) error {
|
||||
return func(conn *grpc.ClientConn) error {
|
||||
logger.Trace("forwarding to another DC")
|
||||
defer logger.Trace("forwarded stream closed")
|
||||
|
||||
client := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
||||
streamHandle, err := client.Subscribe(serverStream.Context(), req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
event, err := streamHandle.Recv()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := serverStream.Send(event); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// filterStreamEvents to only those allowed by the acl token.
|
||||
func filterStreamEvents(authz acl.Authorizer, events []stream.Event) []stream.Event {
|
||||
// authz will be nil when ACLs are disabled
|
||||
if authz == nil || len(events) == 0 {
|
||||
return events
|
||||
}
|
||||
|
||||
// Fast path for the common case of only 1 event since we can avoid slice
|
||||
// allocation in the hot path of every single update event delivered in vast
|
||||
// majority of cases with this. Note that this is called _per event/item_ when
|
||||
// sending snapshots which is a lot worse than being called once on regular
|
||||
// result.
|
||||
if len(events) == 1 {
|
||||
if enforceACL(authz, events[0]) == acl.Allow {
|
||||
return events
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var filtered []stream.Event
|
||||
for idx := range events {
|
||||
event := events[idx]
|
||||
if enforceACL(authz, event) == acl.Allow {
|
||||
filtered = append(filtered, event)
|
||||
}
|
||||
}
|
||||
return filtered
|
||||
}
|
||||
|
||||
func newEventFromStreamEvents(req *pbsubscribe.SubscribeRequest, events []stream.Event) *pbsubscribe.Event {
|
||||
e := &pbsubscribe.Event{
|
||||
Topic: req.Topic,
|
||||
Key: req.Key,
|
||||
Index: events[0].Index,
|
||||
}
|
||||
|
||||
if len(events) == 1 {
|
||||
event := events[0]
|
||||
// TODO: refactor so these are only checked once, instead of 3 times.
|
||||
switch {
|
||||
case event.IsEndOfSnapshot():
|
||||
e.Payload = &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}
|
||||
return e
|
||||
case event.IsNewSnapshotToFollow():
|
||||
e.Payload = &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true}
|
||||
return e
|
||||
}
|
||||
|
||||
setPayload(e, event.Payload)
|
||||
return e
|
||||
}
|
||||
|
||||
e.Payload = &pbsubscribe.Event_EventBatch{
|
||||
EventBatch: &pbsubscribe.EventBatch{
|
||||
Events: batchEventsFromEventSlice(events),
|
||||
},
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
func setPayload(e *pbsubscribe.Event, payload interface{}) {
|
||||
switch p := payload.(type) {
|
||||
case state.EventPayloadCheckServiceNode:
|
||||
e.Payload = &pbsubscribe.Event_ServiceHealth{
|
||||
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
|
||||
Op: p.Op,
|
||||
// TODO: this could be cached
|
||||
CheckServiceNode: pbservice.NewCheckServiceNodeFromStructs(p.Value),
|
||||
},
|
||||
}
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected payload: %T: %#v", p, p))
|
||||
}
|
||||
}
|
||||
|
||||
func batchEventsFromEventSlice(events []stream.Event) []*pbsubscribe.Event {
|
||||
result := make([]*pbsubscribe.Event, len(events))
|
||||
for i := range events {
|
||||
event := events[i]
|
||||
result[i] = &pbsubscribe.Event{Key: event.Key, Index: event.Index}
|
||||
setPayload(result[i], event.Payload)
|
||||
}
|
||||
return result
|
||||
}
|
875
agent/rpc/subscribe/subscribe_test.go
Normal file
875
agent/rpc/subscribe/subscribe_test.go
Normal file
@ -0,0 +1,875 @@
|
||||
package subscribe
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
gogrpc "google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/agent/grpc"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/proto/pbcommon"
|
||||
"github.com/hashicorp/consul/proto/pbservice"
|
||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||
"github.com/hashicorp/consul/types"
|
||||
)
|
||||
|
||||
func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
|
||||
backend, err := newTestBackend()
|
||||
require.NoError(t, err)
|
||||
srv := NewServer(backend, hclog.New(nil))
|
||||
addr := newTestServer(t, srv)
|
||||
ids := newCounter()
|
||||
|
||||
{
|
||||
req := &structs.RegisterRequest{
|
||||
Node: "other",
|
||||
Address: "2.3.4.5",
|
||||
Datacenter: "dc1",
|
||||
Service: &structs.NodeService{
|
||||
ID: "api1",
|
||||
Service: "api",
|
||||
Address: "2.3.4.5",
|
||||
Port: 9000,
|
||||
},
|
||||
}
|
||||
require.NoError(t, backend.store.EnsureRegistration(ids.Next("other"), req))
|
||||
}
|
||||
{
|
||||
req := &structs.RegisterRequest{
|
||||
Node: "node1",
|
||||
Address: "3.4.5.6",
|
||||
Datacenter: "dc1",
|
||||
Service: &structs.NodeService{
|
||||
ID: "redis1",
|
||||
Service: "redis",
|
||||
Address: "3.4.5.6",
|
||||
Port: 8080,
|
||||
},
|
||||
}
|
||||
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg2"), req))
|
||||
}
|
||||
req := &structs.RegisterRequest{
|
||||
Node: "node2",
|
||||
Address: "1.2.3.4",
|
||||
Datacenter: "dc1",
|
||||
Service: &structs.NodeService{
|
||||
ID: "redis1",
|
||||
Service: "redis",
|
||||
Address: "1.1.1.1",
|
||||
Port: 8080,
|
||||
},
|
||||
}
|
||||
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg3"), req))
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
conn, err := gogrpc.DialContext(ctx, addr.String(), gogrpc.WithInsecure())
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(logError(t, conn.Close))
|
||||
|
||||
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
||||
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Key: "redis",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
chEvents := make(chan eventOrError, 0)
|
||||
go recvEvents(chEvents, streamHandle)
|
||||
|
||||
var snapshotEvents []*pbsubscribe.Event
|
||||
for i := 0; i < 3; i++ {
|
||||
snapshotEvents = append(snapshotEvents, getEvent(t, chEvents))
|
||||
}
|
||||
|
||||
expected := []*pbsubscribe.Event{
|
||||
{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Key: "redis",
|
||||
Index: ids.Last(),
|
||||
Payload: &pbsubscribe.Event_ServiceHealth{
|
||||
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
CheckServiceNode: &pbservice.CheckServiceNode{
|
||||
Node: &pbservice.Node{
|
||||
Node: "node1",
|
||||
Datacenter: "dc1",
|
||||
Address: "3.4.5.6",
|
||||
RaftIndex: raftIndex(ids, "reg2", "reg2"),
|
||||
},
|
||||
Service: &pbservice.NodeService{
|
||||
ID: "redis1",
|
||||
Service: "redis",
|
||||
Address: "3.4.5.6",
|
||||
Port: 8080,
|
||||
Weights: &pbservice.Weights{Passing: 1, Warning: 1},
|
||||
// Sad empty state
|
||||
Proxy: pbservice.ConnectProxyConfig{
|
||||
MeshGateway: pbservice.MeshGatewayConfig{},
|
||||
Expose: pbservice.ExposeConfig{},
|
||||
},
|
||||
RaftIndex: raftIndex(ids, "reg2", "reg2"),
|
||||
EnterpriseMeta: pbcommon.EnterpriseMeta{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Key: "redis",
|
||||
Index: ids.Last(),
|
||||
Payload: &pbsubscribe.Event_ServiceHealth{
|
||||
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
CheckServiceNode: &pbservice.CheckServiceNode{
|
||||
Node: &pbservice.Node{
|
||||
Node: "node2",
|
||||
Datacenter: "dc1",
|
||||
Address: "1.2.3.4",
|
||||
RaftIndex: raftIndex(ids, "reg3", "reg3"),
|
||||
},
|
||||
Service: &pbservice.NodeService{
|
||||
ID: "redis1",
|
||||
Service: "redis",
|
||||
Address: "1.1.1.1",
|
||||
Port: 8080,
|
||||
Weights: &pbservice.Weights{Passing: 1, Warning: 1},
|
||||
// Sad empty state
|
||||
Proxy: pbservice.ConnectProxyConfig{
|
||||
MeshGateway: pbservice.MeshGatewayConfig{},
|
||||
Expose: pbservice.ExposeConfig{},
|
||||
},
|
||||
RaftIndex: raftIndex(ids, "reg3", "reg3"),
|
||||
EnterpriseMeta: pbcommon.EnterpriseMeta{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Key: "redis",
|
||||
Index: ids.Last(),
|
||||
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
|
||||
},
|
||||
}
|
||||
assertDeepEqual(t, expected, snapshotEvents)
|
||||
|
||||
// Update the registration by adding a check.
|
||||
req.Check = &structs.HealthCheck{
|
||||
Node: "node2",
|
||||
CheckID: types.CheckID("check1"),
|
||||
ServiceID: "redis1",
|
||||
ServiceName: "redis",
|
||||
Name: "check 1",
|
||||
}
|
||||
require.NoError(t, backend.store.EnsureRegistration(ids.Next("update"), req))
|
||||
|
||||
event := getEvent(t, chEvents)
|
||||
expectedEvent := &pbsubscribe.Event{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Key: "redis",
|
||||
Index: ids.Last(),
|
||||
Payload: &pbsubscribe.Event_ServiceHealth{
|
||||
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
CheckServiceNode: &pbservice.CheckServiceNode{
|
||||
Node: &pbservice.Node{
|
||||
Node: "node2",
|
||||
Datacenter: "dc1",
|
||||
Address: "1.2.3.4",
|
||||
RaftIndex: raftIndex(ids, "reg3", "reg3"),
|
||||
},
|
||||
Service: &pbservice.NodeService{
|
||||
ID: "redis1",
|
||||
Service: "redis",
|
||||
Address: "1.1.1.1",
|
||||
Port: 8080,
|
||||
Weights: &pbservice.Weights{Passing: 1, Warning: 1},
|
||||
// Sad empty state
|
||||
Proxy: pbservice.ConnectProxyConfig{
|
||||
MeshGateway: pbservice.MeshGatewayConfig{},
|
||||
Expose: pbservice.ExposeConfig{},
|
||||
},
|
||||
RaftIndex: raftIndex(ids, "reg3", "reg3"),
|
||||
EnterpriseMeta: pbcommon.EnterpriseMeta{},
|
||||
},
|
||||
Checks: []*pbservice.HealthCheck{
|
||||
{
|
||||
CheckID: "check1",
|
||||
Name: "check 1",
|
||||
Node: "node2",
|
||||
Status: "critical",
|
||||
ServiceID: "redis1",
|
||||
ServiceName: "redis",
|
||||
RaftIndex: raftIndex(ids, "update", "update"),
|
||||
EnterpriseMeta: pbcommon.EnterpriseMeta{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
assertDeepEqual(t, expectedEvent, event)
|
||||
}
|
||||
|
||||
type eventOrError struct {
|
||||
event *pbsubscribe.Event
|
||||
err error
|
||||
}
|
||||
|
||||
// recvEvents from handle and sends them to the provided channel.
|
||||
func recvEvents(ch chan eventOrError, handle pbsubscribe.StateChangeSubscription_SubscribeClient) {
|
||||
defer close(ch)
|
||||
for {
|
||||
event, err := handle.Recv()
|
||||
switch {
|
||||
case errors.Is(err, io.EOF):
|
||||
return
|
||||
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
|
||||
return
|
||||
case err != nil:
|
||||
ch <- eventOrError{err: err}
|
||||
return
|
||||
default:
|
||||
ch <- eventOrError{event: event}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getEvent(t *testing.T, ch chan eventOrError) *pbsubscribe.Event {
|
||||
t.Helper()
|
||||
select {
|
||||
case item := <-ch:
|
||||
require.NoError(t, item.err)
|
||||
return item.event
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatalf("timeout waiting on event from server")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func assertDeepEqual(t *testing.T, x, y interface{}) {
|
||||
t.Helper()
|
||||
if diff := cmp.Diff(x, y); diff != "" {
|
||||
t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff)
|
||||
}
|
||||
}
|
||||
|
||||
type testBackend struct {
|
||||
store *state.Store
|
||||
authorizer func(token string) acl.Authorizer
|
||||
forwardConn *gogrpc.ClientConn
|
||||
}
|
||||
|
||||
func (b testBackend) ResolveToken(token string) (acl.Authorizer, error) {
|
||||
return b.authorizer(token), nil
|
||||
}
|
||||
|
||||
func (b testBackend) Forward(_ string, fn func(*gogrpc.ClientConn) error) (handled bool, err error) {
|
||||
if b.forwardConn != nil {
|
||||
return true, fn(b.forwardConn)
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (b testBackend) Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error) {
|
||||
return b.store.EventPublisher().Subscribe(req)
|
||||
}
|
||||
|
||||
func newTestBackend() (*testBackend, error) {
|
||||
gc, err := state.NewTombstoneGC(time.Second, time.Millisecond)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
store, err := state.NewStateStore(gc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
allowAll := func(_ string) acl.Authorizer {
|
||||
return acl.AllowAll()
|
||||
}
|
||||
return &testBackend{store: store, authorizer: allowAll}, nil
|
||||
}
|
||||
|
||||
var _ Backend = (*testBackend)(nil)
|
||||
|
||||
func newTestServer(t *testing.T, server *Server) net.Addr {
|
||||
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
|
||||
var grpcServer *gogrpc.Server
|
||||
handler := grpc.NewHandler(addr, func(srv *gogrpc.Server) {
|
||||
grpcServer = srv
|
||||
pbsubscribe.RegisterStateChangeSubscriptionServer(srv, server)
|
||||
})
|
||||
|
||||
lis, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(logError(t, lis.Close))
|
||||
|
||||
go grpcServer.Serve(lis)
|
||||
g := new(errgroup.Group)
|
||||
g.Go(func() error {
|
||||
return grpcServer.Serve(lis)
|
||||
})
|
||||
t.Cleanup(func() {
|
||||
if err := handler.Shutdown(); err != nil {
|
||||
t.Logf("grpc server shutdown: %v", err)
|
||||
}
|
||||
if err := g.Wait(); err != nil {
|
||||
t.Logf("grpc server error: %v", err)
|
||||
}
|
||||
})
|
||||
return lis.Addr()
|
||||
}
|
||||
|
||||
type counter struct {
|
||||
value uint64
|
||||
labels map[string]uint64
|
||||
}
|
||||
|
||||
func (c *counter) Next(label string) uint64 {
|
||||
c.value++
|
||||
c.labels[label] = c.value
|
||||
return c.value
|
||||
}
|
||||
|
||||
func (c *counter) For(label string) uint64 {
|
||||
return c.labels[label]
|
||||
}
|
||||
|
||||
func (c *counter) Last() uint64 {
|
||||
return c.value
|
||||
}
|
||||
|
||||
func newCounter() *counter {
|
||||
return &counter{labels: make(map[string]uint64)}
|
||||
}
|
||||
|
||||
func raftIndex(ids *counter, created, modified string) pbcommon.RaftIndex {
|
||||
return pbcommon.RaftIndex{
|
||||
CreateIndex: ids.For(created),
|
||||
ModifyIndex: ids.For(modified),
|
||||
}
|
||||
}
|
||||
|
||||
func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
|
||||
backendLocal, err := newTestBackend()
|
||||
require.NoError(t, err)
|
||||
addrLocal := newTestServer(t, NewServer(backendLocal, hclog.New(nil)))
|
||||
|
||||
backendRemoteDC, err := newTestBackend()
|
||||
require.NoError(t, err)
|
||||
srvRemoteDC := NewServer(backendRemoteDC, hclog.New(nil))
|
||||
addrRemoteDC := newTestServer(t, srvRemoteDC)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
connRemoteDC, err := gogrpc.DialContext(ctx, addrRemoteDC.String(), gogrpc.WithInsecure())
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(logError(t, connRemoteDC.Close))
|
||||
backendLocal.forwardConn = connRemoteDC
|
||||
|
||||
ids := newCounter()
|
||||
{
|
||||
req := &structs.RegisterRequest{
|
||||
Node: "other",
|
||||
Address: "2.3.4.5",
|
||||
Datacenter: "dc2",
|
||||
Service: &structs.NodeService{
|
||||
ID: "api1",
|
||||
Service: "api",
|
||||
Address: "2.3.4.5",
|
||||
Port: 9000,
|
||||
},
|
||||
}
|
||||
require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("reg1"), req))
|
||||
}
|
||||
{
|
||||
req := &structs.RegisterRequest{
|
||||
Node: "node1",
|
||||
Address: "3.4.5.6",
|
||||
Datacenter: "dc2",
|
||||
Service: &structs.NodeService{
|
||||
ID: "redis1",
|
||||
Service: "redis",
|
||||
Address: "3.4.5.6",
|
||||
Port: 8080,
|
||||
},
|
||||
}
|
||||
require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("reg2"), req))
|
||||
}
|
||||
|
||||
req := &structs.RegisterRequest{
|
||||
Node: "node2",
|
||||
Address: "1.2.3.4",
|
||||
Datacenter: "dc2",
|
||||
Service: &structs.NodeService{
|
||||
ID: "redis1",
|
||||
Service: "redis",
|
||||
Address: "1.1.1.1",
|
||||
Port: 8080,
|
||||
},
|
||||
}
|
||||
require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("reg3"), req))
|
||||
|
||||
connLocal, err := gogrpc.DialContext(ctx, addrLocal.String(), gogrpc.WithInsecure())
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(logError(t, connLocal.Close))
|
||||
|
||||
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(connLocal)
|
||||
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Key: "redis",
|
||||
Datacenter: "dc2",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
chEvents := make(chan eventOrError, 0)
|
||||
go recvEvents(chEvents, streamHandle)
|
||||
|
||||
var snapshotEvents []*pbsubscribe.Event
|
||||
for i := 0; i < 3; i++ {
|
||||
snapshotEvents = append(snapshotEvents, getEvent(t, chEvents))
|
||||
}
|
||||
|
||||
expected := []*pbsubscribe.Event{
|
||||
{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Key: "redis",
|
||||
Index: ids.Last(),
|
||||
Payload: &pbsubscribe.Event_ServiceHealth{
|
||||
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
CheckServiceNode: &pbservice.CheckServiceNode{
|
||||
Node: &pbservice.Node{
|
||||
Node: "node1",
|
||||
Datacenter: "dc2",
|
||||
Address: "3.4.5.6",
|
||||
RaftIndex: raftIndex(ids, "reg2", "reg2"),
|
||||
},
|
||||
Service: &pbservice.NodeService{
|
||||
ID: "redis1",
|
||||
Service: "redis",
|
||||
Address: "3.4.5.6",
|
||||
Port: 8080,
|
||||
Weights: &pbservice.Weights{Passing: 1, Warning: 1},
|
||||
// Sad empty state
|
||||
Proxy: pbservice.ConnectProxyConfig{
|
||||
MeshGateway: pbservice.MeshGatewayConfig{},
|
||||
Expose: pbservice.ExposeConfig{},
|
||||
},
|
||||
EnterpriseMeta: pbcommon.EnterpriseMeta{},
|
||||
RaftIndex: raftIndex(ids, "reg2", "reg2"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Key: "redis",
|
||||
Index: ids.Last(),
|
||||
Payload: &pbsubscribe.Event_ServiceHealth{
|
||||
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
CheckServiceNode: &pbservice.CheckServiceNode{
|
||||
Node: &pbservice.Node{
|
||||
Node: "node2",
|
||||
Datacenter: "dc2",
|
||||
Address: "1.2.3.4",
|
||||
RaftIndex: raftIndex(ids, "reg3", "reg3"),
|
||||
},
|
||||
Service: &pbservice.NodeService{
|
||||
ID: "redis1",
|
||||
Service: "redis",
|
||||
Address: "1.1.1.1",
|
||||
Port: 8080,
|
||||
Weights: &pbservice.Weights{Passing: 1, Warning: 1},
|
||||
// Sad empty state
|
||||
Proxy: pbservice.ConnectProxyConfig{
|
||||
MeshGateway: pbservice.MeshGatewayConfig{},
|
||||
Expose: pbservice.ExposeConfig{},
|
||||
},
|
||||
EnterpriseMeta: pbcommon.EnterpriseMeta{},
|
||||
RaftIndex: raftIndex(ids, "reg3", "reg3"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Key: "redis",
|
||||
Index: ids.Last(),
|
||||
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
|
||||
},
|
||||
}
|
||||
assertDeepEqual(t, expected, snapshotEvents)
|
||||
|
||||
// Update the registration by adding a check.
|
||||
req.Check = &structs.HealthCheck{
|
||||
Node: "node2",
|
||||
CheckID: types.CheckID("check1"),
|
||||
ServiceID: "redis1",
|
||||
ServiceName: "redis",
|
||||
Name: "check 1",
|
||||
}
|
||||
require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("update"), req))
|
||||
|
||||
event := getEvent(t, chEvents)
|
||||
expectedEvent := &pbsubscribe.Event{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Key: "redis",
|
||||
Index: ids.Last(),
|
||||
Payload: &pbsubscribe.Event_ServiceHealth{
|
||||
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
CheckServiceNode: &pbservice.CheckServiceNode{
|
||||
Node: &pbservice.Node{
|
||||
Node: "node2",
|
||||
Datacenter: "dc2",
|
||||
Address: "1.2.3.4",
|
||||
RaftIndex: raftIndex(ids, "reg3", "reg3"),
|
||||
},
|
||||
Service: &pbservice.NodeService{
|
||||
ID: "redis1",
|
||||
Service: "redis",
|
||||
Address: "1.1.1.1",
|
||||
Port: 8080,
|
||||
RaftIndex: raftIndex(ids, "reg3", "reg3"),
|
||||
Weights: &pbservice.Weights{Passing: 1, Warning: 1},
|
||||
// Sad empty state
|
||||
Proxy: pbservice.ConnectProxyConfig{
|
||||
MeshGateway: pbservice.MeshGatewayConfig{},
|
||||
Expose: pbservice.ExposeConfig{},
|
||||
},
|
||||
EnterpriseMeta: pbcommon.EnterpriseMeta{},
|
||||
},
|
||||
Checks: []*pbservice.HealthCheck{
|
||||
{
|
||||
CheckID: "check1",
|
||||
Name: "check 1",
|
||||
Node: "node2",
|
||||
Status: "critical",
|
||||
ServiceID: "redis1",
|
||||
ServiceName: "redis",
|
||||
RaftIndex: raftIndex(ids, "update", "update"),
|
||||
EnterpriseMeta: pbcommon.EnterpriseMeta{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
assertDeepEqual(t, expectedEvent, event)
|
||||
}
|
||||
|
||||
// TODO: test case for converting stream.Events to pbsubscribe.Events, including framing events
|
||||
|
||||
func TestServer_Subscribe_IntegrationWithBackend_FilterEventsByACLToken(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for -short run")
|
||||
}
|
||||
|
||||
backend, err := newTestBackend()
|
||||
require.NoError(t, err)
|
||||
srv := NewServer(backend, hclog.New(nil))
|
||||
addr := newTestServer(t, srv)
|
||||
|
||||
// Create a policy for the test token.
|
||||
rules := `
|
||||
service "foo" {
|
||||
policy = "write"
|
||||
}
|
||||
node "node1" {
|
||||
policy = "write"
|
||||
}
|
||||
`
|
||||
authorizer, err := acl.NewAuthorizerFromRules(
|
||||
"1", 0, rules, acl.SyntaxCurrent,
|
||||
&acl.Config{WildcardName: structs.WildcardSpecifier},
|
||||
nil)
|
||||
require.NoError(t, err)
|
||||
authorizer = acl.NewChainedAuthorizer([]acl.Authorizer{authorizer, acl.DenyAll()})
|
||||
require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil))
|
||||
|
||||
// TODO: is there any easy way to do this with the acl package?
|
||||
token := "this-token-is-good"
|
||||
backend.authorizer = func(tok string) acl.Authorizer {
|
||||
if tok == token {
|
||||
return authorizer
|
||||
}
|
||||
return acl.DenyAll()
|
||||
}
|
||||
|
||||
ids := newCounter()
|
||||
{
|
||||
req := &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
Address: "127.0.0.1",
|
||||
Service: &structs.NodeService{
|
||||
ID: "foo",
|
||||
Service: "foo",
|
||||
},
|
||||
Check: &structs.HealthCheck{
|
||||
CheckID: "service:foo",
|
||||
Name: "service:foo",
|
||||
Node: "node1",
|
||||
ServiceID: "foo",
|
||||
Status: api.HealthPassing,
|
||||
},
|
||||
}
|
||||
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg1"), req))
|
||||
|
||||
// Register a service which should be denied
|
||||
req = &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
Address: "127.0.0.1",
|
||||
Service: &structs.NodeService{
|
||||
ID: "bar",
|
||||
Service: "bar",
|
||||
},
|
||||
Check: &structs.HealthCheck{
|
||||
CheckID: "service:bar",
|
||||
Name: "service:bar",
|
||||
Node: "node1",
|
||||
ServiceID: "bar",
|
||||
},
|
||||
}
|
||||
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg2"), req))
|
||||
|
||||
req = &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "denied",
|
||||
Address: "127.0.0.1",
|
||||
Service: &structs.NodeService{
|
||||
ID: "foo",
|
||||
Service: "foo",
|
||||
},
|
||||
}
|
||||
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg3"), req))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
conn, err := gogrpc.DialContext(ctx, addr.String(), gogrpc.WithInsecure())
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(logError(t, conn.Close))
|
||||
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
||||
|
||||
// Start a Subscribe call to our streaming endpoint for the service we have access to.
|
||||
{
|
||||
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Key: "foo",
|
||||
Token: token,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
chEvents := make(chan eventOrError, 0)
|
||||
go recvEvents(chEvents, streamHandle)
|
||||
|
||||
event := getEvent(t, chEvents)
|
||||
require.Equal(t, "foo", event.GetServiceHealth().CheckServiceNode.Service.Service)
|
||||
require.Equal(t, "node1", event.GetServiceHealth().CheckServiceNode.Node.Node)
|
||||
|
||||
require.True(t, getEvent(t, chEvents).GetEndOfSnapshot())
|
||||
|
||||
// Update the service with a new port to trigger a new event.
|
||||
req := &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
Address: "127.0.0.1",
|
||||
Service: &structs.NodeService{
|
||||
ID: "foo",
|
||||
Service: "foo",
|
||||
Port: 1234,
|
||||
},
|
||||
Check: &structs.HealthCheck{
|
||||
CheckID: "service:foo",
|
||||
Name: "service:foo",
|
||||
ServiceID: "foo",
|
||||
Status: api.HealthPassing,
|
||||
Node: "node1",
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg4"), req))
|
||||
|
||||
event = getEvent(t, chEvents)
|
||||
service := event.GetServiceHealth().CheckServiceNode.Service
|
||||
require.Equal(t, "foo", service.Service)
|
||||
require.Equal(t, int32(1234), service.Port)
|
||||
|
||||
// Now update the service on the denied node and make sure we don't see an event.
|
||||
req = &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "denied",
|
||||
Address: "127.0.0.1",
|
||||
Service: &structs.NodeService{
|
||||
ID: "foo",
|
||||
Service: "foo",
|
||||
Port: 2345,
|
||||
},
|
||||
Check: &structs.HealthCheck{
|
||||
CheckID: "service:foo",
|
||||
Name: "service:foo",
|
||||
ServiceID: "foo",
|
||||
Status: api.HealthPassing,
|
||||
Node: "denied",
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg5"), req))
|
||||
|
||||
select {
|
||||
case event := <-chEvents:
|
||||
t.Fatalf("should not have received event: %v", event)
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
|
||||
// Start another subscribe call for bar, which the token shouldn't have access to.
|
||||
{
|
||||
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Key: "bar",
|
||||
Token: token,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
chEvents := make(chan eventOrError, 0)
|
||||
go recvEvents(chEvents, streamHandle)
|
||||
|
||||
require.True(t, getEvent(t, chEvents).GetEndOfSnapshot())
|
||||
|
||||
// Update the service and make sure we don't get a new event.
|
||||
req := &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
Address: "127.0.0.1",
|
||||
Service: &structs.NodeService{
|
||||
ID: "bar",
|
||||
Service: "bar",
|
||||
Port: 2345,
|
||||
},
|
||||
Check: &structs.HealthCheck{
|
||||
CheckID: "service:bar",
|
||||
Name: "service:bar",
|
||||
ServiceID: "bar",
|
||||
Node: "node1",
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg6"), req))
|
||||
|
||||
select {
|
||||
case event := <-chEvents:
|
||||
t.Fatalf("should not have received event: %v", event)
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestServer_Subscribe_IntegrationWithBackend_ACLUpdate(t *testing.T) {
|
||||
backend, err := newTestBackend()
|
||||
require.NoError(t, err)
|
||||
srv := NewServer(backend, hclog.New(nil))
|
||||
addr := newTestServer(t, srv)
|
||||
|
||||
rules := `
|
||||
service "foo" {
|
||||
policy = "write"
|
||||
}
|
||||
node "node1" {
|
||||
policy = "write"
|
||||
}
|
||||
`
|
||||
authorizer, err := acl.NewAuthorizerFromRules(
|
||||
"1", 0, rules, acl.SyntaxCurrent,
|
||||
&acl.Config{WildcardName: structs.WildcardSpecifier},
|
||||
nil)
|
||||
require.NoError(t, err)
|
||||
authorizer = acl.NewChainedAuthorizer([]acl.Authorizer{authorizer, acl.DenyAll()})
|
||||
require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil))
|
||||
|
||||
// TODO: is there any easy way to do this with the acl package?
|
||||
token := "this-token-is-good"
|
||||
backend.authorizer = func(tok string) acl.Authorizer {
|
||||
if tok == token {
|
||||
return authorizer
|
||||
}
|
||||
return acl.DenyAll()
|
||||
}
|
||||
|
||||
ids := newCounter()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
conn, err := gogrpc.DialContext(ctx, addr.String(), gogrpc.WithInsecure())
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(logError(t, conn.Close))
|
||||
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
||||
|
||||
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Key: "foo",
|
||||
Token: token,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
chEvents := make(chan eventOrError, 0)
|
||||
go recvEvents(chEvents, streamHandle)
|
||||
|
||||
require.True(t, getEvent(t, chEvents).GetEndOfSnapshot())
|
||||
|
||||
tokenID, err := uuid.GenerateUUID()
|
||||
require.NoError(t, err)
|
||||
|
||||
aclToken := &structs.ACLToken{
|
||||
AccessorID: tokenID,
|
||||
SecretID: token,
|
||||
Rules: "",
|
||||
}
|
||||
require.NoError(t, backend.store.ACLTokenSet(ids.Next("update"), aclToken, false))
|
||||
|
||||
select {
|
||||
case item := <-chEvents:
|
||||
require.Error(t, item.err, "got event: %v", item.event)
|
||||
s, _ := status.FromError(item.err)
|
||||
require.Equal(t, codes.Aborted, s.Code())
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("timeout waiting for aborted error")
|
||||
}
|
||||
}
|
||||
|
||||
func logError(t *testing.T, f func() error) func() {
|
||||
return func() {
|
||||
if err := f(); err != nil {
|
||||
t.Logf(err.Error())
|
||||
}
|
||||
}
|
||||
}
|
@ -11,6 +11,7 @@ import (
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent/consul"
|
||||
"github.com/hashicorp/consul/agent/grpc"
|
||||
"github.com/hashicorp/consul/agent/grpc/resolver"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
@ -86,6 +87,8 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
|
||||
// TODO(streaming): setConfig.Scheme name for tests
|
||||
builder := resolver.NewServerResolverBuilder(resolver.Config{})
|
||||
resolver.RegisterWithGRPC(builder)
|
||||
d.GRPCConnPool = grpc.NewClientConnPool(builder, grpc.TLSWrapper(d.TLSConfigurator.OutgoingRPCWrapper()))
|
||||
|
||||
d.Router = router.NewRouter(d.Logger, cfg.Datacenter, fmt.Sprintf("%s.%s", cfg.NodeName, cfg.Datacenter), builder)
|
||||
|
||||
acConf := autoconf.Config{
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
"github.com/mitchellh/hashstructure"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
@ -1579,6 +1580,25 @@ func (csn *CheckServiceNode) BestAddress(wan bool) (string, int) {
|
||||
return addr, port
|
||||
}
|
||||
|
||||
func (csn *CheckServiceNode) CanRead(authz acl.Authorizer) acl.EnforcementDecision {
|
||||
if csn.Node == nil || csn.Service == nil {
|
||||
return acl.Deny
|
||||
}
|
||||
|
||||
// TODO(streaming): add enterprise test that uses namespaces
|
||||
authzContext := new(acl.AuthorizerContext)
|
||||
csn.Service.FillAuthzContext(authzContext)
|
||||
|
||||
if authz.NodeRead(csn.Node.Node, authzContext) != acl.Allow {
|
||||
return acl.Deny
|
||||
}
|
||||
|
||||
if authz.ServiceRead(csn.Service.Service, authzContext) != acl.Allow {
|
||||
return acl.Deny
|
||||
}
|
||||
return acl.Allow
|
||||
}
|
||||
|
||||
type CheckServiceNodes []CheckServiceNode
|
||||
|
||||
// Shuffle does an in-place random shuffle using the Fisher-Yates algorithm.
|
||||
|
@ -8,13 +8,15 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestEncodeDecode(t *testing.T) {
|
||||
@ -1152,7 +1154,7 @@ func TestStructs_HealthCheck_Clone(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestStructs_CheckServiceNodes_Shuffle(t *testing.T) {
|
||||
func TestCheckServiceNodes_Shuffle(t *testing.T) {
|
||||
// Make a huge list of nodes.
|
||||
var nodes CheckServiceNodes
|
||||
for i := 0; i < 100; i++ {
|
||||
@ -1185,7 +1187,7 @@ func TestStructs_CheckServiceNodes_Shuffle(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestStructs_CheckServiceNodes_Filter(t *testing.T) {
|
||||
func TestCheckServiceNodes_Filter(t *testing.T) {
|
||||
nodes := CheckServiceNodes{
|
||||
CheckServiceNode{
|
||||
Node: &Node{
|
||||
@ -1288,6 +1290,79 @@ func TestStructs_CheckServiceNodes_Filter(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckServiceNodes_CanRead(t *testing.T) {
|
||||
type testCase struct {
|
||||
name string
|
||||
csn CheckServiceNode
|
||||
authz acl.Authorizer
|
||||
expected acl.EnforcementDecision
|
||||
}
|
||||
|
||||
fn := func(t *testing.T, tc testCase) {
|
||||
actual := tc.csn.CanRead(tc.authz)
|
||||
require.Equal(t, tc.expected, actual)
|
||||
}
|
||||
|
||||
var testCases = []testCase{
|
||||
{
|
||||
name: "empty",
|
||||
expected: acl.Deny,
|
||||
},
|
||||
{
|
||||
name: "node read not authorized",
|
||||
csn: CheckServiceNode{
|
||||
Node: &Node{Node: "name"},
|
||||
Service: &NodeService{Service: "service-name"},
|
||||
},
|
||||
authz: aclAuthorizerCheckServiceNode{allowService: true},
|
||||
expected: acl.Deny,
|
||||
},
|
||||
{
|
||||
name: "service read not authorized",
|
||||
csn: CheckServiceNode{
|
||||
Node: &Node{Node: "name"},
|
||||
Service: &NodeService{Service: "service-name"},
|
||||
},
|
||||
authz: aclAuthorizerCheckServiceNode{allowNode: true},
|
||||
expected: acl.Deny,
|
||||
},
|
||||
{
|
||||
name: "read authorized",
|
||||
csn: CheckServiceNode{
|
||||
Node: &Node{Node: "name"},
|
||||
Service: &NodeService{Service: "service-name"},
|
||||
},
|
||||
authz: acl.AllowAll(),
|
||||
expected: acl.Allow,
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
fn(t, tc)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type aclAuthorizerCheckServiceNode struct {
|
||||
acl.Authorizer
|
||||
allowNode bool
|
||||
allowService bool
|
||||
}
|
||||
|
||||
func (a aclAuthorizerCheckServiceNode) ServiceRead(string, *acl.AuthorizerContext) acl.EnforcementDecision {
|
||||
if a.allowService {
|
||||
return acl.Allow
|
||||
}
|
||||
return acl.Deny
|
||||
}
|
||||
|
||||
func (a aclAuthorizerCheckServiceNode) NodeRead(string, *acl.AuthorizerContext) acl.EnforcementDecision {
|
||||
if a.allowNode {
|
||||
return acl.Allow
|
||||
}
|
||||
return acl.Deny
|
||||
}
|
||||
|
||||
func TestStructs_DirEntry_Clone(t *testing.T) {
|
||||
e := &DirEntry{
|
||||
LockIndex: 5,
|
||||
|
Loading…
x
Reference in New Issue
Block a user