subscirbe: extract streamID and logging from Subscribe

By extracting all of the tracing logic the core logic of the Subscribe
endpoint is much easier to read.
This commit is contained in:
Daniel Nephin 2020-09-28 18:52:31 -04:00
parent 9e4ebacb05
commit dbb8bd679f
4 changed files with 109 additions and 85 deletions

View File

@ -18,6 +18,16 @@ import (
"time"
metrics "github.com/armon/go-metrics"
connlimit "github.com/hashicorp/go-connlimit"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/hashicorp/serf/serf"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"github.com/hashicorp/consul/acl"
ca "github.com/hashicorp/consul/agent/connect/ca"
"github.com/hashicorp/consul/agent/consul/authmethod"
@ -38,15 +48,6 @@ import (
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
connlimit "github.com/hashicorp/go-connlimit"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/hashicorp/serf/serf"
"golang.org/x/time/rate"
"google.golang.org/grpc"
)
// These are the protocol versions that Consul can _understand_. These are
@ -615,10 +616,9 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler
}
register := func(srv *grpc.Server) {
pbsubscribe.RegisterStateChangeSubscriptionServer(srv, &subscribe.Server{
Backend: &subscribeBackend{srv: s, connPool: deps.GRPCConnPool},
Logger: deps.Logger.Named("grpc-api.subscription"),
})
pbsubscribe.RegisterStateChangeSubscriptionServer(srv, subscribe.NewServer(
&subscribeBackend{srv: s, connPool: deps.GRPCConnPool},
deps.Logger.Named("grpc-api.subscription")))
}
return agentgrpc.NewHandler(config.RPCAddr, register)
}

71
agent/subscribe/logger.go Normal file
View File

@ -0,0 +1,71 @@
package subscribe
import (
"sync"
"time"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// streamID is used in logs as a unique identifier for a subscription. The value
// is created lazily on the first call to String() so that we do not call it
// if trace logging is disabled.
// If a random UUID can not be created, defaults to the current time formatted
// as RFC3339Nano.
//
// TODO(banks) it might be nice one day to replace this with OpenTracing ID
// if one is set etc. but probably pointless until we support that properly
// in other places so it's actually propagated properly. For now this just
// makes lifetime of a stream more traceable in our regular server logs for
// debugging/dev.
type streamID struct {
once sync.Once
id string
}
func (s *streamID) String() string {
s.once.Do(func() {
var err error
s.id, err = uuid.GenerateUUID()
if err != nil {
s.id = time.Now().Format(time.RFC3339Nano)
}
})
return s.id
}
func (h *Server) newLoggerForRequest(req *pbsubscribe.SubscribeRequest) Logger {
return h.Logger.With(
"topic", req.Topic.String(),
"dc", req.Datacenter,
"key", req.Key,
"index", req.Index,
"stream_id", &streamID{})
}
type eventLogger struct {
logger Logger
snapshotDone bool
count uint64
}
func (l *eventLogger) Trace(e []stream.Event) {
if len(e) == 0 {
return
}
first := e[0]
switch {
case first.IsEndOfSnapshot() || first.IsEndOfEmptySnapshot():
l.snapshotDone = true
l.logger.Trace("snapshot complete", "index", first.Index, "sent", l.count)
case l.snapshotDone:
l.logger.Trace("sending events", "index", first.Index, "sent", l.count, "batch_size", len(e))
}
l.count += uint64(len(e))
}

View File

@ -4,7 +4,7 @@ import (
"errors"
"fmt"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@ -23,9 +23,13 @@ type Server struct {
Logger Logger
}
func NewServer(backend Backend, logger Logger) *Server {
return &Server{Backend: backend, Logger: logger}
}
type Logger interface {
IsTrace() bool
Trace(msg string, args ...interface{})
With(args ...interface{}) hclog.Logger
}
var _ pbsubscribe.StateChangeSubscriptionServer = (*Server)(nil)
@ -37,41 +41,17 @@ type Backend interface {
}
func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsubscribe.StateChangeSubscription_SubscribeServer) error {
// streamID is just used for message correlation in trace logs and not
// populated normally.
var streamID string
if h.Logger.IsTrace() {
// TODO(banks) it might be nice one day to replace this with OpenTracing ID
// if one is set etc. but probably pointless until we support that properly
// in other places so it's actually propagated properly. For now this just
// makes lifetime of a stream more traceable in our regular server logs for
// debugging/dev.
var err error
streamID, err = uuid.GenerateUUID()
if err != nil {
return err
}
}
// TODO: add fields to logger and pass logger around instead of streamID
handled, err := h.Backend.Forward(req.Datacenter, h.forwardToDC(req, serverStream, streamID))
logger := h.newLoggerForRequest(req)
handled, err := h.Backend.Forward(req.Datacenter, forwardToDC(req, serverStream, logger))
if handled || err != nil {
return err
}
h.Logger.Trace("new subscription",
"topic", req.Topic.String(),
"key", req.Key,
"index", req.Index,
"stream_id", streamID,
)
var sentCount uint64
defer h.Logger.Trace("subscription closed", "stream_id", streamID)
logger.Trace("new subscription")
defer logger.Trace("subscription closed")
// Resolve the token and create the ACL filter.
// TODO: handle token expiry gracefully...
// TODO(streaming): handle token expiry gracefully...
authz, err := h.Backend.ResolveToken(req.Token)
if err != nil {
return err
@ -84,12 +64,13 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
defer sub.Unsubscribe()
ctx := serverStream.Context()
snapshotDone := false
elog := &eventLogger{logger: logger}
for {
events, err := sub.Next(ctx)
switch {
case errors.Is(err, stream.ErrSubscriptionClosed):
h.Logger.Trace("subscription reset by server", "stream_id", streamID)
logger.Trace("subscription reset by server")
return status.Error(codes.Aborted, err.Error())
case err != nil:
return err
@ -100,23 +81,7 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
continue
}
first := events[0]
switch {
case first.IsEndOfSnapshot() || first.IsEndOfEmptySnapshot():
snapshotDone = true
h.Logger.Trace("snapshot complete",
"index", first.Index, "sent", sentCount, "stream_id", streamID)
case snapshotDone:
h.Logger.Trace("sending events",
"index", first.Index,
"sent", sentCount,
"batch_size", len(events),
"stream_id", streamID,
)
}
sentCount += uint64(len(events))
elog.Trace(events)
e := newEventFromStreamEvents(req, events)
if err := serverStream.Send(e); err != nil {
return err
@ -134,26 +99,14 @@ func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest) *stream.Subscri
}
}
func (h *Server) forwardToDC(
func forwardToDC(
req *pbsubscribe.SubscribeRequest,
serverStream pbsubscribe.StateChangeSubscription_SubscribeServer,
streamID string,
logger Logger,
) func(conn *grpc.ClientConn) error {
return func(conn *grpc.ClientConn) error {
h.Logger.Trace("forwarding to another DC",
"dc", req.Datacenter,
"topic", req.Topic.String(),
"key", req.Key,
"index", req.Index,
"stream_id", streamID,
)
defer func() {
h.Logger.Trace("forwarded stream closed",
"dc", req.Datacenter,
"stream_id", streamID,
)
}()
logger.Trace("forwarding to another DC")
defer logger.Trace("forwarded stream closed")
client := pbsubscribe.NewStateChangeSubscriptionClient(conn)
streamHandle, err := client.Subscribe(serverStream.Context(), req)
@ -175,7 +128,7 @@ func (h *Server) forwardToDC(
// filterStreamEvents to only those allowed by the acl token.
func filterStreamEvents(authz acl.Authorizer, events []stream.Event) []stream.Event {
// TODO: when is authz nil?
// authz will be nil when ACLs are disabled
if authz == nil || len(events) == 0 {
return events
}

View File

@ -32,7 +32,7 @@ import (
func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
backend, err := newTestBackend()
require.NoError(t, err)
srv := &Server{Backend: backend, Logger: hclog.New(nil)}
srv := NewServer(backend, hclog.New(nil))
addr := newTestServer(t, srv)
ids := newCounter()
@ -373,11 +373,11 @@ func raftIndex(ids *counter, created, modified string) pbcommon.RaftIndex {
func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
backendLocal, err := newTestBackend()
require.NoError(t, err)
addrLocal := newTestServer(t, &Server{Backend: backendLocal, Logger: hclog.New(nil)})
addrLocal := newTestServer(t, NewServer(backendLocal, hclog.New(nil)))
backendRemoteDC, err := newTestBackend()
require.NoError(t, err)
srvRemoteDC := &Server{Backend: backendRemoteDC, Logger: hclog.New(nil)}
srvRemoteDC := NewServer(backendRemoteDC, hclog.New(nil))
addrRemoteDC := newTestServer(t, srvRemoteDC)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
@ -592,7 +592,7 @@ func TestServer_Subscribe_IntegrationWithBackend_FilterEventsByACLToken(t *testi
backend, err := newTestBackend()
require.NoError(t, err)
srv := &Server{Backend: backend, Logger: hclog.New(nil)}
srv := NewServer(backend, hclog.New(nil))
addr := newTestServer(t, srv)
// Create a policy for the test token.
@ -796,7 +796,7 @@ node "node1" {
func TestServer_Subscribe_IntegrationWithBackend_ACLUpdate(t *testing.T) {
backend, err := newTestBackend()
require.NoError(t, err)
srv := &Server{Backend: backend, Logger: hclog.New(nil)}
srv := NewServer(backend, hclog.New(nil))
addr := newTestServer(t, srv)
rules := `