consul/agent/grpc-internal/services/subscribe/subscribe.go

132 lines
3.5 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package subscribe
import (
"errors"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/private/pbsubscribe"
)
// Server implements a StateChangeSubscriptionServer for accepting SubscribeRequests,
// and sending events to the subscription topic.
type Server struct {
Backend Backend
Logger Logger
}
func NewServer(backend Backend, logger Logger) *Server {
return &Server{Backend: backend, Logger: logger}
}
type Logger interface {
Trace(msg string, args ...interface{})
With(args ...interface{}) hclog.Logger
}
var _ pbsubscribe.StateChangeSubscriptionServer = (*Server)(nil)
type Backend interface {
ResolveTokenAndDefaultMeta(token string, entMeta *acl.EnterpriseMeta, authzContext *acl.AuthorizerContext) (acl.Authorizer, error)
Forward(info structs.RPCInfo, f func(*grpc.ClientConn) error) (handled bool, err error)
Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error)
}
func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsubscribe.StateChangeSubscription_SubscribeServer) error {
logger := newLoggerForRequest(h.Logger, req)
handled, err := h.Backend.Forward(req, forwardToDC(req, serverStream, logger))
if handled || err != nil {
return err
}
logger.Trace("new subscription")
defer logger.Trace("subscription closed")
entMeta := req.EnterpriseMeta()
authz, err := h.Backend.ResolveTokenAndDefaultMeta(req.Token, &entMeta, nil)
if err != nil {
return err
}
subReq, err := state.PBToStreamSubscribeRequest(req, entMeta)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
sub, err := h.Backend.Subscribe(subReq)
if err != nil {
return err
}
defer sub.Unsubscribe()
ctx := serverStream.Context()
elog := &eventLogger{logger: logger}
for {
event, err := sub.Next(ctx)
switch {
case errors.Is(err, stream.ErrSubForceClosed):
logger.Trace("subscription reset by server")
return status.Error(codes.Aborted, err.Error())
case errors.Is(err, stream.ErrACLChanged):
logger.Trace("ACL change occurred; re-authenticating")
_, authzErr := h.Backend.ResolveTokenAndDefaultMeta(req.Token, &entMeta, nil)
if authzErr != nil {
return authzErr
}
// Otherwise, abort the stream so the client re-subscribes.
return status.Error(codes.Aborted, err.Error())
case err != nil:
return err
}
if !event.Payload.HasReadPermission(authz) {
continue
}
elog.Trace(event)
// TODO: This conversion could be cached if needed
e := event.Payload.ToSubscriptionEvent(event.Index)
if err := serverStream.Send(e); err != nil {
return err
}
}
}
func forwardToDC(
req *pbsubscribe.SubscribeRequest,
serverStream pbsubscribe.StateChangeSubscription_SubscribeServer,
logger Logger,
) func(conn *grpc.ClientConn) error {
return func(conn *grpc.ClientConn) error {
logger.Trace("forwarding to another DC")
defer logger.Trace("forwarded stream closed")
client := pbsubscribe.NewStateChangeSubscriptionClient(conn)
streamHandle, err := client.Subscribe(serverStream.Context(), req)
if err != nil {
return err
}
for {
event, err := streamHandle.Recv()
if err != nil {
return err
}
if err := serverStream.Send(event); err != nil {
return err
}
}
}
}