2020-09-08 19:22:35 +00:00
|
|
|
package subscribe
|
|
|
|
|
|
|
|
import (
|
2020-09-08 21:31:47 +00:00
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
|
2020-09-28 22:52:31 +00:00
|
|
|
"github.com/hashicorp/go-hclog"
|
2020-09-08 21:31:47 +00:00
|
|
|
"google.golang.org/grpc"
|
|
|
|
"google.golang.org/grpc/codes"
|
|
|
|
"google.golang.org/grpc/status"
|
|
|
|
|
|
|
|
"github.com/hashicorp/consul/acl"
|
2020-09-25 23:40:10 +00:00
|
|
|
"github.com/hashicorp/consul/agent/consul/state"
|
2020-09-08 21:31:47 +00:00
|
|
|
"github.com/hashicorp/consul/agent/consul/stream"
|
2020-09-25 23:40:10 +00:00
|
|
|
"github.com/hashicorp/consul/proto/pbservice"
|
2020-09-08 21:31:47 +00:00
|
|
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
2020-09-08 19:22:35 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// Server implements a StateChangeSubscriptionServer for accepting SubscribeRequests,
|
|
|
|
// and sending events to the subscription topic.
|
|
|
|
type Server struct {
|
2020-09-08 21:31:47 +00:00
|
|
|
Backend Backend
|
|
|
|
Logger Logger
|
2020-09-08 19:22:35 +00:00
|
|
|
}
|
|
|
|
|
2020-09-28 22:52:31 +00:00
|
|
|
func NewServer(backend Backend, logger Logger) *Server {
|
|
|
|
return &Server{Backend: backend, Logger: logger}
|
|
|
|
}
|
|
|
|
|
2020-09-08 21:31:47 +00:00
|
|
|
type Logger interface {
|
|
|
|
Trace(msg string, args ...interface{})
|
2020-09-28 22:52:31 +00:00
|
|
|
With(args ...interface{}) hclog.Logger
|
2020-09-08 21:31:47 +00:00
|
|
|
}
|
2020-09-08 19:22:35 +00:00
|
|
|
|
2020-09-08 21:31:47 +00:00
|
|
|
var _ pbsubscribe.StateChangeSubscriptionServer = (*Server)(nil)
|
|
|
|
|
|
|
|
type Backend interface {
|
|
|
|
ResolveToken(token string) (acl.Authorizer, error)
|
|
|
|
Forward(dc string, f func(*grpc.ClientConn) error) (handled bool, err error)
|
|
|
|
Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsubscribe.StateChangeSubscription_SubscribeServer) error {
|
2020-09-28 22:52:31 +00:00
|
|
|
logger := h.newLoggerForRequest(req)
|
|
|
|
handled, err := h.Backend.Forward(req.Datacenter, forwardToDC(req, serverStream, logger))
|
2020-09-08 21:31:47 +00:00
|
|
|
if handled || err != nil {
|
|
|
|
return err
|
2020-09-08 19:22:35 +00:00
|
|
|
}
|
|
|
|
|
2020-09-28 22:52:31 +00:00
|
|
|
logger.Trace("new subscription")
|
|
|
|
defer logger.Trace("subscription closed")
|
2020-09-08 19:22:35 +00:00
|
|
|
|
|
|
|
// Resolve the token and create the ACL filter.
|
2020-09-28 22:52:31 +00:00
|
|
|
// TODO(streaming): handle token expiry gracefully...
|
2020-09-08 21:31:47 +00:00
|
|
|
authz, err := h.Backend.ResolveToken(req.Token)
|
2020-09-08 19:22:35 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-09-08 21:31:47 +00:00
|
|
|
sub, err := h.Backend.Subscribe(toStreamSubscribeRequest(req))
|
2020-09-08 19:22:35 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-09-08 21:31:47 +00:00
|
|
|
defer sub.Unsubscribe()
|
2020-09-08 19:22:35 +00:00
|
|
|
|
2020-09-08 21:31:47 +00:00
|
|
|
ctx := serverStream.Context()
|
2020-09-28 22:52:31 +00:00
|
|
|
|
|
|
|
elog := &eventLogger{logger: logger}
|
2020-09-08 19:22:35 +00:00
|
|
|
for {
|
2020-09-08 21:31:47 +00:00
|
|
|
events, err := sub.Next(ctx)
|
|
|
|
switch {
|
|
|
|
case errors.Is(err, stream.ErrSubscriptionClosed):
|
2020-09-28 22:52:31 +00:00
|
|
|
logger.Trace("subscription reset by server")
|
2020-09-08 21:31:47 +00:00
|
|
|
return status.Error(codes.Aborted, err.Error())
|
|
|
|
case err != nil:
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
events = filterStreamEvents(authz, events)
|
|
|
|
if len(events) == 0 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2020-09-28 22:52:31 +00:00
|
|
|
elog.Trace(events)
|
2020-09-08 21:31:47 +00:00
|
|
|
e := newEventFromStreamEvents(req, events)
|
|
|
|
if err := serverStream.Send(e); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: can be replaced by mog conversion
|
|
|
|
func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest) *stream.SubscribeRequest {
|
|
|
|
return &stream.SubscribeRequest{
|
|
|
|
Topic: req.Topic,
|
|
|
|
Key: req.Key,
|
|
|
|
Token: req.Token,
|
|
|
|
Index: req.Index,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-28 22:52:31 +00:00
|
|
|
func forwardToDC(
|
2020-09-08 21:31:47 +00:00
|
|
|
req *pbsubscribe.SubscribeRequest,
|
|
|
|
serverStream pbsubscribe.StateChangeSubscription_SubscribeServer,
|
2020-09-28 22:52:31 +00:00
|
|
|
logger Logger,
|
2020-09-08 21:31:47 +00:00
|
|
|
) func(conn *grpc.ClientConn) error {
|
|
|
|
return func(conn *grpc.ClientConn) error {
|
2020-09-28 22:52:31 +00:00
|
|
|
logger.Trace("forwarding to another DC")
|
|
|
|
defer logger.Trace("forwarded stream closed")
|
2020-09-08 21:31:47 +00:00
|
|
|
|
|
|
|
client := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
|
|
|
streamHandle, err := client.Subscribe(serverStream.Context(), req)
|
2020-09-08 19:22:35 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-09-08 21:31:47 +00:00
|
|
|
for {
|
|
|
|
event, err := streamHandle.Recv()
|
|
|
|
if err != nil {
|
2020-09-08 19:22:35 +00:00
|
|
|
return err
|
|
|
|
}
|
2020-09-08 21:31:47 +00:00
|
|
|
if err := serverStream.Send(event); err != nil {
|
2020-09-08 19:22:35 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-08 21:31:47 +00:00
|
|
|
// filterStreamEvents to only those allowed by the acl token.
|
|
|
|
func filterStreamEvents(authz acl.Authorizer, events []stream.Event) []stream.Event {
|
2020-09-28 22:52:31 +00:00
|
|
|
// authz will be nil when ACLs are disabled
|
2020-09-08 21:31:47 +00:00
|
|
|
if authz == nil || len(events) == 0 {
|
|
|
|
return events
|
|
|
|
}
|
2020-09-08 19:22:35 +00:00
|
|
|
|
2020-09-08 21:31:47 +00:00
|
|
|
// Fast path for the common case of only 1 event since we can avoid slice
|
|
|
|
// allocation in the hot path of every single update event delivered in vast
|
|
|
|
// majority of cases with this. Note that this is called _per event/item_ when
|
|
|
|
// sending snapshots which is a lot worse than being called once on regular
|
|
|
|
// result.
|
|
|
|
if len(events) == 1 {
|
|
|
|
if enforceACL(authz, events[0]) == acl.Allow {
|
|
|
|
return events
|
|
|
|
}
|
|
|
|
return nil
|
2020-09-08 19:22:35 +00:00
|
|
|
}
|
|
|
|
|
2020-09-08 21:31:47 +00:00
|
|
|
var filtered []stream.Event
|
|
|
|
for idx := range events {
|
|
|
|
event := events[idx]
|
|
|
|
if enforceACL(authz, event) == acl.Allow {
|
|
|
|
filtered = append(filtered, event)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return filtered
|
|
|
|
}
|
2020-09-08 19:22:35 +00:00
|
|
|
|
2020-09-08 21:31:47 +00:00
|
|
|
func newEventFromStreamEvents(req *pbsubscribe.SubscribeRequest, events []stream.Event) *pbsubscribe.Event {
|
|
|
|
e := &pbsubscribe.Event{
|
|
|
|
Topic: req.Topic,
|
|
|
|
Key: req.Key,
|
|
|
|
Index: events[0].Index,
|
|
|
|
}
|
2020-09-25 23:40:10 +00:00
|
|
|
|
2020-09-08 21:31:47 +00:00
|
|
|
if len(events) == 1 {
|
2020-09-25 23:40:10 +00:00
|
|
|
event := events[0]
|
|
|
|
// TODO: refactor so these are only checked once, instead of 3 times.
|
|
|
|
switch {
|
|
|
|
case event.IsEndOfSnapshot():
|
|
|
|
e.Payload = &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}
|
|
|
|
return e
|
2020-10-02 17:55:41 +00:00
|
|
|
case event.IsNewSnapshotToFollow():
|
|
|
|
e.Payload = &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true}
|
2020-09-25 23:40:10 +00:00
|
|
|
return e
|
|
|
|
}
|
|
|
|
|
|
|
|
setPayload(e, event.Payload)
|
2020-09-08 21:31:47 +00:00
|
|
|
return e
|
|
|
|
}
|
2020-09-08 19:22:35 +00:00
|
|
|
|
2020-09-08 21:31:47 +00:00
|
|
|
e.Payload = &pbsubscribe.Event_EventBatch{
|
|
|
|
EventBatch: &pbsubscribe.EventBatch{
|
|
|
|
Events: batchEventsFromEventSlice(events),
|
|
|
|
},
|
2020-09-08 19:22:35 +00:00
|
|
|
}
|
2020-09-08 21:31:47 +00:00
|
|
|
return e
|
|
|
|
}
|
2020-09-08 19:22:35 +00:00
|
|
|
|
2020-09-08 21:31:47 +00:00
|
|
|
func setPayload(e *pbsubscribe.Event, payload interface{}) {
|
|
|
|
switch p := payload.(type) {
|
|
|
|
case state.EventPayloadCheckServiceNode:
|
|
|
|
e.Payload = &pbsubscribe.Event_ServiceHealth{
|
|
|
|
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
|
|
|
|
Op: p.Op,
|
|
|
|
// TODO: this could be cached
|
|
|
|
CheckServiceNode: pbservice.NewCheckServiceNodeFromStructs(p.Value),
|
|
|
|
},
|
2020-09-08 19:22:35 +00:00
|
|
|
}
|
2020-09-08 21:31:47 +00:00
|
|
|
default:
|
|
|
|
panic(fmt.Sprintf("unexpected payload: %T: %#v", p, p))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func batchEventsFromEventSlice(events []stream.Event) []*pbsubscribe.Event {
|
|
|
|
result := make([]*pbsubscribe.Event, len(events))
|
|
|
|
for i := range events {
|
|
|
|
event := events[i]
|
|
|
|
result[i] = &pbsubscribe.Event{Key: event.Key, Index: event.Index}
|
|
|
|
setPayload(result[i], event.Payload)
|
2020-09-08 19:22:35 +00:00
|
|
|
}
|
2020-09-08 21:31:47 +00:00
|
|
|
return result
|
2020-09-08 19:22:35 +00:00
|
|
|
}
|